Recently I got task of implementing classical producer/consumer
algorithm.
After revision of existing implementation I decided to create my own
version.
It takes several days for planing and implementing test version.
Don't kick my ass if I reinvent the wheel :)
Task description:
There are 2 threads. First writes to queue, second reads from it.
Implement queue without using mutexes, critical sections, etc.
Implementation:
Queue class contains 3 pointers to chains of messages:
1. Chain for writing
2. Temporary chain
3. Chain for reading
and state flag for pointer of temporary chain.
Code:
template <class TYPE> class QueueNoMutex
{
public:
QueueNoMutex();
~QueueNoMutex();
// Write to the queue
void Enqueue(const TYPE& data);
// Flushing writing chain to temporary one if writing thread
finishes earlier then reader one
bool Flush();
// Read from queue
bool Dequeue(TYPE& data);
private:
// chain element
struct Element
{
TYPE data;
Element* next;
};
void DeleteChain(Element* el);
volatile bool isTempQueueSet;
Element *readerTop;
Element *tempTop;
Element *writerTop, *writerBottom;
};
template<class TYPE>
QueueNoMutex<TYPE>::QueueNoMutex()
{
readerTop = tempTop = writerTop = writerBottom = NULL;
isTempQueueSet = false;
}
template<class TYPE>
QueueNoMutex<TYPE>::~QueueNoMutex()
{
DeleteChain(readerTop);
DeleteChain(tempTop);
DeleteChain(writerTop);
}
//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
void
QueueNoMutex<TYPE>::Enqueue(const TYPE& data)
{
// create new chain element and put data in it
Element* el = new Element;
el->data = data;
el->next = NULL;
// if writing chain isn't empty add created element to the bottom
if (writerTop)
{
writerBottom->next = el;
writerBottom = el;
}
else
{
// otherwise created element is the first element of chain
writerTop = writerBottom = el;
}
// check if temporary chain is free
if (!isTempQueueSet)
{
// move writer chain pointer to the temporary one
tempTop = writerTop;
// free writer chain pointer
writerTop = NULL;
// and set flag of temporary chain
isTempQueueSet = true;
}
}
template<class TYPE>
bool
QueueNoMutex<TYPE>::Flush()
{
// if writing chain is empty return success
if (!writerTop) return true;
// check if temporary chain is free
if (!isTempQueueSet)
{
// move writer chain pointer to the temporary one
tempTop = writerTop;
// free writer chain pointer
writerTop = NULL;
// and set flag of temporary chain
isTempQueueSet = true;
// return success
return true;
}
// writing chain doesn't flushed yet
return false;
}
template<class TYPE>
bool
QueueNoMutex<TYPE>::Dequeue(TYPE& data)
{
// if reading chain is empty
if (!readerTop)
{
// check if there is data in the temporary chain
if (isTempQueueSet)
{
// move temporary chain to the reading one
readerTop = tempTop;
// free temporary chain pointer (don't do this if Flush()
method used)
tempTop = NULL;
// inform that temporary chain is free
isTempQueueSet = false;
}
else
{
// there is no data to read
return false;
}
}
// get data from the top element
data = readerTop->data;
// delete and move top element of reading chain
Element* cur = readerTop;
readerTop = cur->next;
delete cur;
return true;
}
template<class TYPE>
void
QueueNoMutex<TYPE>::DeleteChain(Element* el)
{
while (el)
{
Element* cur = el;
el = cur->next;
delete cur;
}
}
Code of writer and reader threads:
typedef QueueNoMutex<int> MyQueue;
class WriterThread: public Thread
{
public:
void DoWork()
{
int data = 100;
// main loop
while (data >= 0)
{
queue->Enqueue(data--);
// emulation of real thread work
Sleep(100);
}
// flush writing chain if last insert operation doesn't move
writing chain to the temporary one
while (!queue->Flush())
{
Sleep(100);
}
}
// ...
private:
MyQueue* queue;
};
class ReaderThread: public Thread
{
public:
void DoWork()
{
int curVal = 1, data = 1;
bool found = false;
// main reading loop
while (data > 0)
{
while (queue->Dequeue(data))
{
if (curVal != data)
{
printf("%d != %d\n", curVal, data);
}
curVal = data - 1;
found = true;
}
if (found)
{
printf("Last read: %d\n", data);
found = false;
}
// emulation of real thread work
Sleep(200);
}
printf("Done.\n");
}
// ...
private:
MyQueue* queue;
};
Note again that this algorithm is only for one writer thread and one
reading thread.
I've tested it different threads loading. It worked without any
problems.
It's also needed to make speed test in comparison with other
algorithms.
I'm planning to test this implementation in the real project in the
near future.
I would like to know your opinions. What is good is my algorithm and
what is bad.
P.S. I've posted this message also on the Russian www.rsdn.ru forum
and guys suggest that it needed to use some memory barrier methods
because of compiler and CPU instruction swapping.
:^)
I don't have time right no to look at the code in any detail. The first
thing I noticed is that there are no place holders for any memory-barrier
instructions. Its good to work out the memory visibility requirements of any
algorithm, and at least make some comments on it in the code.
I have an implementation for a similar queue, more minimalist queue here
from AppCore:
http://appcore.home.comcast.net
_________________________________________________________________
/* MUST be four adjacent words */
typedef struct
AC_DECLSPEC_PACKED
ac_i686_node_
{
struct ac_i686_node_ *next;
struct ac_i686_node_ *lfgc_next;
ac_fp_dtor_t fp_dtor;
const void *state;
} ac_i686_node_t;
/*
NOTE: The ac_i686_node_t::lfgc_next member has nothing to do with
the following x86 queue implementation
*/
/* MUST be two adjacent words. front must be at offset 0 */
typedef struct
AC_DECLSPEC_PACKED
ac_i686_queue_spsc_
{
ac_i686_node_t *front;
ac_i686_node_t *back;
} ac_i686_queue_spsc_t;
AC_SYS_APIEXPORT void AC_CDECL
ac_i686_queue_spsc_push
( ac_i686_queue_spsc_t*,
ac_i686_node_t* );
--------------------------------------
.align 16
.globl ac_i686_queue_spsc_push
ac_i686_queue_spsc_push:
MOVL 4(%ESP), %EAX
MOVL 8(%ESP), %ECX
MOVL 4(%EAX), %EDX
MOVL %ECX, (%EDX)
MOVL %ECX, 4(%EAX)
RET
AC_SYS_APIEXPORT ac_i686_node_t* AC_CDECL
ac_i686_queue_spsc_pop
( ac_i686_queue_spsc_t* );
--------------------------------------
.align 16
.globl ac_i686_queue_spsc_pop
ac_i686_queue_spsc_pop:
PUSHL %EBX
MOVL 8(%ESP), %ECX
MOVL (%ECX), %EAX
CMPL 4(%ECX), %EAX
JE ac_i686_queue_spsc_pop_failed
MOVL (%EAX), %EDX
MOVL 12(%EDX), %EBX
MOVL %EDX, (%ECX)
MOVL %EBX, 12(%EAX)
POPL %EBX
RET
ac_i686_queue_spsc_pop_failed:
XORL %EAX, %EAX
POPL %EBX
RET
_________________________________________________________________
One of the good things about coding in assembly language is that everything
is as-is, and you don't need to worry about the compiler reordering
anything. Also, the memory visibility can be explicitly programmed, or
implied by the hardware, and its easy to just point to some documentation:
http://developer.intel.com/products/processor/manuals/318147.pdf
I can say that the algorithm works based on the detail outlined in that
document...
Anyway, I will take a look at your stuff when I get some time. Thanks for
posting it here.
[...]
I dislike the fact that you create/delete nodes (e.g., Element objects)
every time I push/pop something to/from the queue. I would like to pass the
queue interface pointers to the nodes directly such that I can manage their
lifetimes externally. Also, you don't need five variables:
_____________________________________________________________
bool volatile QueueNoMutex<T>::isTempQueueSet;
Element* QueueNoMutex<T>::readerTop;
Element* QueueNoMutex<T>::tempTop;
Element* QueueNoMutex<T>::writerTop;
Element* QueueNoMutex<T>::writerBottom;
_____________________________________________________________
to create a lock-free single-producer/consumer queue; two will do fine
(e.g., head and tail pointers). I can't quite see an advantage to using 5
variables. Although, I have not studied your algorithm in any detail yet...
> P.S. I've posted this message also on the Russian www.rsdn.ru forum
> and guys suggest that it needed to use some memory barrier methods
> because of compiler and CPU instruction swapping.
Stick the implementation in an externally assembled library to avoid
compiler optimizations; link-time optimization aside for a moment:
http://groups.google.com/group/comp.programming.threads/msg/423df394a0370fa6
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/29ea516c5581240e