Ok, so let me take the opportunity to reissue the V'jukov MPSC queue
algorithm as a relatively complete template class, with many comments,
and the additional return values from push( ) and pop( ). I hope the
auto-formatting of this discussion board doesn't chop all the lines in
half, we'll see...
/*/
Advantages:
Producers are wait-free. They execute only XCHG, which is faster
than CAS.
One XCHG is maximum that one can get with multi-producer non-
distributed queues.
On fast-path consumer also doesn't execute any atomic RMW. It
executes XCHG rarely.
No need for node order reversion, so pop operation is always
O(1).
Intrusive. No need for additional internal nodes.
ABA-free.
No need for PDR, nor for GC
Disadvantage:
If producer blocked in (*), then consumer is blocked too.
Fortunately
'window of inconsistency' is extremely small - producer must be
blocked
exactly in (*). Actually it's a disadvantage only as compared
with a totally
lockfree algorithm. It's still much better than a lock-based
algorithm.
This disadvantage is handled as a BUSY state, indicated when
pop( ) returns 1.
BUSY is not really a normal "blocking" state, rather it means a
next node is
delayed a few microsecs for a time-slice cycle.
The push( ) function returns 0 if it encounters an empty queue, or
1 if nonempty.
This knowledge is useful to callers who need to set a queue-non-
empty signal.
/*/
// T may be any type with a public T *next data member and a default
constructor
template<class T>
class FastQueue : public SharedObjectEx {
T *head, *tail, *stub;
public:
FastQueue ( );
virtual ~FastQueue ( );
Integer push (T* node); // put another node into the queue
T* pop ( ); // return 0=empty, 1=busy, or a node
pointer
T* popwait ( ); // spin while busy, return 0=empty, or a
node pointer
};
template<class T>
FastQueue<T>::FastQueue ( ) { head = tail = stub = new T; stub->next
= 0; }
template<class T>
FastQueue<T>::~FastQueue ( ) { stub->Release ( ); }
// push node onto the queue, return 0 if the queue was empty, 1 if non-
empty.
template<class T>
Integer FastQueue<T>::push (T* node) {
node->next = 0;
T* prev = (T*)safeswap ((void**)&head, node); // atomic (prev =
head, head = node)
// (*) another push( ) here would succeed at this point
// (*) a pop( ) here would find the list without this or
later nodes
prev->next = node; // link prev to node, now the pop( )
can find it
return prev != stub; // return 0 on a push into an empty
queue, 1 if nonempty
}
template<class T>
T* FastQueue<T>::popwait ( ) {
T* node;
while (1 == (Integer)(node = pop ( )))
Sleep (0); // wait for nonbusy result
return node;
}
template<class T>
T* FastQueue<T>::pop ( ) {
T *tail1 = tail, *tail2 = tail->next; // tail1 is last node,
tail2 is 2nd to last
if (tail1 == stub) { // if last node is the stub
if (0 == tail2)
return (T*)(head != tail); // if no 2nd to last, then
list empty, return 0/1
tail1 = tail = tail2; // new tail is the new last,
lose the stub here
tail2 = tail1->next; // new next is the new 2nd to
last
}
if (tail2) {
tail = tail2; // if 2nd to last exists then
return the tail
// prefetch the next tail and return the current tail
return tail1; // this is the usual return point
from this function
}
if (tail1 == head) { // if consistent, and queue has
one node,
push (stub); // then push the stub to force
two nodes.
if (tail2 = tail1->next) { // set next = 2nd to last, is it
now nonzero?
tail = tail2; // if so, then pull off the last
node and return it
return tail1;
}
}
return (T*)(head != tail); // otherwise the list is still
empty or busy (0 or 1)
}