Someone asked over comp.programming.threads for interesting queue:
http://groups.google.com/group/comp.programming.threads/tree/browse_frm/thread/7ebc232efb7df973
In short, the queue is single-producer/single-consumer, the queue is
bounded, the queue is based on array (i.e. no dynamic memory
management during enqueue/dequeue), consumer must block when queue is
empty. The tricky part is: producer must never block, i.e. if queue is
already full, he must get the oldest not yet consumed node, overwrite
the data and re-enqueue the node.
Here it is.
Algorithm outline:
There are 2 FIFO queues. First is used for passing full nodes and is
single-producer/multi-consumer. Second is used for passing empty nodes
back and is multi-producer/single-consumer.
Producer first tries to dequeue a node from free queue, if fails
dequeue a node from full queue. Then fills the node with data. Then
enqueues the node into full queue. (plus some trickery to handle ABA
problem for consumer - see get_free_node())
Consumer dequeues a node from full queue. Consume the data. Then
enqueues the node to free queue.
Also note how consumer is blocked on "queue empty" condition -
'waiting' variable.
The queue is 100% lock-free. Producer never blocks. Consumer blocks
only when queue is empty.
A block of memory is allocated only once in constructor, then split
into nodes. No memory management during enqueue/dequeue calls. The
queue is "zero-copy", for that purpose enqueue/dequeue methods are
split into "prepare" and "commit" parts. So usage pattern is:
void consumer_thread(queue<T>* q)
{
for (;;)
{
T& data = q->dequeue_prepare();
consume_impl(data);
q->dequeue_commit();
}
}
void producer_thread(queue<T>* q)
{
for (;;)
{
T& data = q->enqueue_prepare();
produce_impl(data);
q->enqueue_commit();
}
}
Here is the implementation:
template<typename T>
class queue
{
public:
queue(size_t count)
{
assert(count >= 6);
sema = CreateSemaphore(0, 0, 1, 0);
waiting = false;
deq_node = 0;
block = new node [count];
block->next = 0;
full_tail = block;
full_head = block;
free_head = block + 1;
free_tail = block + count - 1;
free_tail->next = 0;
for (size_t i = 1; i != count - 1; i += 1)
block[i].next = block + i + 1;
}
~queue()
{
CloseHandle(sema);
delete [] block;
}
T& enqueue_prepare()
{
return full_tail->data;
}
void enqueue_commit()
{
node* n = get_free_node();
n->next = 0;
full_tail->next = n;
full_tail = n;
if (waiting)
{
waiting = false;
ReleaseSemaphore(sema, 1, 0);
}
}
T& dequeue_prepare()
{
deq_node = get_full_node();
return deq_node->data;
}
void dequeue_commit()
{
deq_node->next = 0;
node* prev = free_tail.exchange(deq_node);
prev->next = deq_node;
}
private:
struct node
{
std::atomic<node*> next;
T data;
};
node* block;
node* full_tail;
node* free_head;
node* deq_node;
char pad [64];
std::atomic<node*> full_head;
std::atomic<node*> free_tail;
std::atomic<bool> waiting;
HANDLE sema;
node* get_free_node()
{
for (;;)
{
node* n = free_head;
node* next = n->next;
if (next)
{
free_head = next;
return n;
}
n = full_head;
next = n->next;
if (next)
{
if (full_head.compare_exchange_strong(n, next))
{
node* n2 = free_head;
node* next2 = n2->next;
if (next2)
{
n->next = 0;
node* prev = free_tail.exchange(n);
prev->next = n;
free_head = next2;
return n2;
}
else
{
return n;
}
}
}
}
}
node* get_full_node()
{
node* n = full_head;
for (;;)
{
node* next = n->next;
if (next == 0)
{
waiting = true;
n = full_head;
next = n->next;
if (next)
{
waiting = false;
}
else
{
WaitForSingleObject(sema, INFINITE);
n = full_head;
continue;
}
}
if (full_head.compare_exchange_strong(n, next))
return n;
}
}
};
--
Dmitriy V'jukov