Bounded queue with non-blocking producer

200 views
Skip to first unread message

Dmitriy Vyukov

unread,
Oct 18, 2009, 7:37:41 AM10/18/09
to Scalable Synchronization Algorithms
Someone asked over comp.programming.threads for interesting queue:
http://groups.google.com/group/comp.programming.threads/tree/browse_frm/thread/7ebc232efb7df973
In short, the queue is single-producer/single-consumer, the queue is
bounded, the queue is based on array (i.e. no dynamic memory
management during enqueue/dequeue), consumer must block when queue is
empty. The tricky part is: producer must never block, i.e. if queue is
already full, he must get the oldest not yet consumed node, overwrite
the data and re-enqueue the node.

Here it is.
Algorithm outline:
There are 2 FIFO queues. First is used for passing full nodes and is
single-producer/multi-consumer. Second is used for passing empty nodes
back and is multi-producer/single-consumer.
Producer first tries to dequeue a node from free queue, if fails
dequeue a node from full queue. Then fills the node with data. Then
enqueues the node into full queue. (plus some trickery to handle ABA
problem for consumer - see get_free_node())
Consumer dequeues a node from full queue. Consume the data. Then
enqueues the node to free queue.
Also note how consumer is blocked on "queue empty" condition -
'waiting' variable.
The queue is 100% lock-free. Producer never blocks. Consumer blocks
only when queue is empty.
A block of memory is allocated only once in constructor, then split
into nodes. No memory management during enqueue/dequeue calls. The
queue is "zero-copy", for that purpose enqueue/dequeue methods are
split into "prepare" and "commit" parts. So usage pattern is:

void consumer_thread(queue<T>* q)
{
for (;;)
{
T& data = q->dequeue_prepare();
consume_impl(data);
q->dequeue_commit();
}

}

void producer_thread(queue<T>* q)
{
for (;;)
{
T& data = q->enqueue_prepare();
produce_impl(data);
q->enqueue_commit();
}

}

Here is the implementation:

template<typename T>
class queue
{
public:
queue(size_t count)
{
assert(count >= 6);
sema = CreateSemaphore(0, 0, 1, 0);
waiting = false;
deq_node = 0;
block = new node [count];
block->next = 0;
full_tail = block;
full_head = block;
free_head = block + 1;
free_tail = block + count - 1;
free_tail->next = 0;
for (size_t i = 1; i != count - 1; i += 1)
block[i].next = block + i + 1;
}

~queue()
{
CloseHandle(sema);
delete [] block;
}

T& enqueue_prepare()
{
return full_tail->data;
}

void enqueue_commit()
{
node* n = get_free_node();
n->next = 0;
full_tail->next = n;
full_tail = n;
if (waiting)
{
waiting = false;
ReleaseSemaphore(sema, 1, 0);
}
}

T& dequeue_prepare()
{
deq_node = get_full_node();
return deq_node->data;
}

void dequeue_commit()
{
deq_node->next = 0;
node* prev = free_tail.exchange(deq_node);
prev->next = deq_node;
}

private:
struct node
{
std::atomic<node*> next;
T data;
};

node* block;
node* full_tail;
node* free_head;
node* deq_node;
char pad [64];
std::atomic<node*> full_head;
std::atomic<node*> free_tail;
std::atomic<bool> waiting;
HANDLE sema;

node* get_free_node()
{
for (;;)
{
node* n = free_head;
node* next = n->next;
if (next)
{
free_head = next;
return n;
}

n = full_head;
next = n->next;
if (next)
{
if (full_head.compare_exchange_strong(n, next))
{
node* n2 = free_head;
node* next2 = n2->next;
if (next2)
{
n->next = 0;
node* prev = free_tail.exchange(n);
prev->next = n;
free_head = next2;
return n2;
}
else
{
return n;
}
}
}
}
}

node* get_full_node()
{
node* n = full_head;
for (;;)
{
node* next = n->next;
if (next == 0)
{
waiting = true;
n = full_head;
next = n->next;
if (next)
{
waiting = false;
}
else
{
WaitForSingleObject(sema, INFINITE);
n = full_head;
continue;
}
}
if (full_head.compare_exchange_strong(n, next))
return n;
}
}
};


--
Dmitriy V'jukov

John M. Dlugosz

unread,
Oct 27, 2009, 12:12:50 PM10/27/09
to Scalable Synchronization Algorithms
I implemented something similar recently. It is a queue that is not
only non-blocking but doesn't need any compare-exchange operations
either. Being multi-producer, single consumer, it needs a single
atomic ++ on the producer side. If there is only one producer too,
that does not need to be atomic either and it's totally atomic-free as
well.

An array is used as a ring buffer.
Integers R and W are indexes into the array, but here's the key: they
always count forward and don't wrap. So, using R or W must modulo the
array size when doing the indexing, but advancing is always a single
increment with no compares etc.

A flag in the element indicates that it is used or unused, so the
reader can pause if the record is still being written by the
producer. So, write "prepare" increments the counter and returns a
reference to the record to pour stuff into, and "commit" sets the
flag.

On the read side, "prepare" will return the record, and "commit" will
clear the flag and then increment the R counter.

I didn't generalize it, but it could certainly be templatized with
wrapping access calls like you describe.

In my application, it is never full, as the number of records produced
is limited by other aspects of the program. The read code doesn't
block, but returns failure; it's up to the caller to block or do
something else.

--John
Reply all
Reply to author
Forward
0 new messages