Basicaly I started with the Michael & Scott FIFO lock free queue. Ya'll know
that right?
I'll just pitch right at it. Instead of using 0 / NULL to signify that a
node is the terminating node in the list, use the tail counter. Specificaly
have the tail count start at 1, increase by 2s, and use that as the
terminator. Pointers should always be 4 byte aligned, so if the 'next' ptr
is odd it's the last node, if it's not it's valid and the tail has fallen
behind.
So now the ABA problem is defeated because when you CAS the terminating
link, it must be equal to the tail count, or else the tail has fallen
behind. And should a node get recycled, the tail count will have increased
by then, so a CAS with the old tail count will fail.
QPtr
{
Node* ptr;
int tag; // initalized to 1
}
void enqueue(Node* node)
{
while(true)
{
tail = atomic_read_8b(&q_tail); // atomicaly read tail.
node->next = tail.count+2; // inc by 2 will keep tail count odd
if (CAS(&q_tail.ptr->next, tail.count, node)) break;
CAS(&q_tail, tail, QPtr(node, tail.count+=2)); // swing tail
}
CAS(&q_tail, tail, QPtr(node, tail.count+=2)); // swing tail
}
So now each node only needs a single pointer instead of a tag & a pointer as
in the original michael and scott algorithm.
> node->next = tail.count+2; // inc by 2 will keep tail count odd
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
I must be missing something important here, but how does this work? How are
you allocating and deallocating the nodes? Basically, my specific question
is how are you getting the actual node that `node->next' points to over on
the consumer side? Could you show some code for the consumer?
Thanks!
:^)
Ok knocked up a more complete example...
Nodes are allocated and freed via a freelist. Aint included that.
==========================================================
struct Node
{
Node* next;
int value;
}
struct QPtr
{
Node* ptr;
int tag;
QPtr(Node* p, int t) : ptr(p), tag(t) {}
}
class Queue
{
private:
QPtr head;
QPtr tail;
public:
Queue::Queue()
{
Node* node = new_node(); // get node from free list
node->next = (Node*) 1; // Tail count starts at 1
head.ptr = tail.ptr = node;
head.tag = tail.tag = 1;
}
// =================
void enqueue(int v)
{
Node* node = new_node(); // get node from free list
node.value = v;
while(true)
{
ttt = AtomicRead8B(&tail);
node->next = (node*)(ttt.tag+2);
if (CAS(&ttt.ptr->next, tail.tag, node)) break;
CAS(&tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2)); // Siwng tail
}
CAS(&tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2)); // Swing tail
}
// =================
int dequeue()
{
while (true)
{
hhh = AtomicRead8B(&head);
ttt = AtomicRead8B(&tail);
if (hhh.ptr == ttt.ptr)
{
// Empty queue if 'next' is an odd number, ie not a valid
ptr.
if ((hhh.ptr->next & 1) == 1) return 0;
CAS(tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2)); // Try to
swing tail
}
else
{
int v = hhh.ptr->next->value;
if CAS(head, hhh, QPtr(hhh.ptr->next, head.count+1));
{
free_node(hhh.ptr); // return node to free list
return v;
}
}
}
}
}
I see what you are doing and I think it should work out.
I think I spoke to fast here. I am not sure about ABA wrt operations on the
next pointers of the nodes. I need to flesh out some code and check the
algorithm out. Do you happen to have an example implementation?
Unfortunately, I cannot see exactly how to implement this in Relacy Race
Detector:
Actually now that I think about it, I can implement this in Relacy. When I
get some time, I will go ahead and do it. I will post it here when it's
finished.
Thank you for your patience.
:^)
If this will help I worked on it a bit more and got it working. Amazing how
many typos and bugs the were in the original version. I really should have
coded a proper working example to begin with.
Anyways this correctly completes a series of enqueues and dequeues from a
single thread.
// ================================
template <typename T>
inline T AtomicRead4B(T* source)
{
assert(sizeof(T) == 4);
assert((int(source) & 3) == 0);
__asm
{
MOV EAX,[source]
MOV EAX,[EAX]
}
}
// ==============================
template <typename T>
inline T AtomicRead8B(T* source)
{
assert(sizeof(T) == 8);
assert((int(source) & 7) == 0);
T result;
__asm
{
MOV EAX,[source]
MOVQ MM0,[EAX]
MOVQ [result],MM0
EMMS
}
return result;
}
// ===================================
template <typename T>
inline int CmpSwap4B(T* target, T expected, T newval)
{
assert(sizeof(T) == 4);
__asm
{
MOV ECX,[target]
MOV EAX,[expected]
MOV EDX,[newval]
LOCK CMPXCHG [ECX],EDX
MOV EAX,0
SETZ AL
}
}
// ===============================
template <typename T>
inline int CmpSwap8B(T* target, T expected, T newval)
{
assert(sizeof(T) == 8);
__asm
{
MOV ESI,[target]
MOV EAX,DWORD PTR [expected]
MOV EDX,DWORD PTR [expected+4]
MOV EBX,DWORD PTR [newval]
MOV ECX,DWORD PTR [newval+4]
LOCK CMPXCHG8B [ESI]
MOV EAX,0
SETZ AL
}
}
// ===================================
const int INITIALNODES = 100; // Initial nodes in freelist
struct Node
{
Node* next;
int value;
};
struct QPtr
{
Node* ptr;
int tag;
QPtr() : ptr(NULL), tag(1) {}
QPtr(Node* p, int t) : ptr(p), tag(t) {}
};
class Queue
{
private:
QPtr head;
QPtr tail;
QPtr flist;
void moreNodes()
{
for (int i = 0; i < INITIALNODES; i ++)
{
freeNode(new Node);
}
}
void freeNode(Node* node)
{
while (true)
{
QPtr top = AtomicRead8B(&flist);
QPtr tmp(node, top.tag+1);
node->next = top.ptr;
if (CmpSwap8B<QPtr>(&flist, top, tmp)) return;
}
}
Node* grabNode()
{
while (true)
{
QPtr top = AtomicRead8B(&flist);
if (top.ptr == NULL)
{
moreNodes();
}
else
{
QPtr tmp(top.ptr->next, top.tag+1);
if (CmpSwap8B<QPtr>(&flist, top, tmp)) return top.ptr;
}
}
}
public:
Queue::Queue() :
head(),
tail(),
flist()
{
Node* node = grabNode(); // get node from free list
node->next = (Node*) 1; // Tail count starts at 1
head.ptr = tail.ptr = node;
head.tag = tail.tag = 1;
}
// =================
void enqueue(int v)
{
Node* node = grabNode(); // get node from free list
node->value = v;
QPtr ttt;
while(true)
{
ttt = AtomicRead8B(&tail);
node->next = (Node*)(ttt.tag+2);
if (CmpSwap4B<Node*>(&ttt.ptr->next, (Node*)tail.tag, node))
break;
CmpSwap8B(&tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2)); // Swing
tail
}
CmpSwap8B(&tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2)); // Swing tail
}
// =================
int dequeue()
{
while (true)
{
QPtr hhh = AtomicRead8B(&head);
QPtr ttt = AtomicRead8B(&tail);
if (hhh.ptr == ttt.ptr)
{
if ((int(hhh.ptr->next) & 1) == 1) return 0; // Terminating
node
CmpSwap8B<QPtr>(&tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2));
}
else
{
int v = hhh.ptr->next->value;
if (CmpSwap8B<QPtr>(&head, hhh, QPtr(hhh.ptr->next,
head.tag+1)))
{
freeNode(hhh.ptr);
return v;
}
}
}
}
};
Thank you for taking the time to implement this. Okay, I created a VERY
simple test application:
http://cpt.pastebin.com/f74f349e3
You're algorithm seems to work when there is a single producer/consumer...
However, when I use two producers/consumers I am getting a seg-fault at line
`204'. I cut-and-pasted you're example, and I only had to add
`__declspec(align(8))' to the queue and freelist anchors in order to get the
assertion at line `30' to pass. I also has to include `cassert' and
`cstddef' to get it to compile. I cannot find any bugs in the actual
simplistic test code and it works with 1 producer and 1 consumer. Could you
please look over the code and see if everything is Kosher? Perhaps try to
compile and run it under a recent version of MSVC. I am using:
_________________________________________________________
Microsoft Visual Studio 2008
Version 9.0.30729.1 SP
_________________________________________________________
The code as-is uses 2 producers and 2 consumers, and I get the seg-fault
with that. However, if you change the `PRODUCERS' macro to 1, then it sure
seems to work. Can you reproduce the results that I am experiencing? BTW, I
am running the test in debug mode. Also, I am using the win32-pthread
library to create the threads because I prefer to use PThreads instead of
Windows Threads any chance I get:
http://sourceware.org/pthreads-win32/
I cannot easily pin down the source of the problem. I have a gut feeling
that it has to do with ABA problem on the nodes next pointer. Or, it could
be a simple typo in the code you posted. I really need to model this
algorithm in Relacy. I mean, that's why it's there right?
;^)
Anyway, thanks for posting this because it's been kind of DEAD around here
lately!
I think the problem is that the node that "hhh" points at can become the
final node in the list and so it's next pointer can become garbage. I think
this should fix it..
// =================
int dequeue()
{
while (true)
{
QPtr hhh = AtomicRead8B(&head);
Node* next = hhh.ptr->next;
QPtr ttt = AtomicRead8B(&tail);
if (head == hh) // Make sure head and next are consistent.
{
if (hhh.ptr == ttt.ptr)
{
if ((int(hhh.ptr->next) & 1) == 1) return 0; // Terminating
node
CmpSwap8B<QPtr>(&tail, ttt, QPtr(ttt.ptr->next, ttt.tag+2));
}
else
{
int v = next->value;
if (CmpSwap8B<QPtr>(&head, hhh, QPtr(hhh.ptr->next,
head.tag+1)))
{
freeNode(hhh.ptr);
return v;
}
}
}
}
}
Amyways off to bed now. I'll have a better look tomorow.
I incorporated the changes:
http://cpt.pastebin.com/f39deca30
Unfortunately, I am still seg-faulting at the exact same place. It's at line
`209' in the modified code. Also, I got a seg-fault at line `54' after
running it several times. This happened once out of many runs, so AFAICT
there is a fairly nasty race-condition in there somewhere.
:^o
Ok, got your test code up and running. And I think I've fixed my code.
179 if (CmpSwap4B<Node*>(&ttt.ptr->next, (Node*)ttt.tag, node))
180 break;
I was using "tail.tag" instead of "ttt.tag", when patching the new node in.
I was getting the same errors you described, but now ive run it about 10
times without error. Cranked it up to 8 producers and consumers and it still
gave no errors. Yay!
I really see now why it's so important to have a test for code like this. I
could have checked over it 100 times and never spotted that error.
So, how thorough is your test code? Is it enough to test if an algorithm is
reliable? I mean would you crank up the number of consumers/producers and
run it for a couple of hours? Or are the numbers you have in there enough?
And thanks for helping/teaching me. I appreciate it.
This is weird. Im back at home running the code on a different PC, and none
of it gives any seg faults or errors. I mean the original version, the first
one you paste binned, doesnt throw any errors, whereas it was on the PC at
work. My work PC is dual core, but my home is single, but I stil dont see
why that would make things work one one but not the other? They should both
be suceptible to the bug. And tbh now i looked at the bug (the last one i
found at least), i dont see how that would have caused/fixed the problem
anyway.
Arrggh it's making my head hurt. *wry smile*
Anyways im gonna leave it for now and try it again on my work PC tomorow.
That will definitely screw things up!
;^)
> I was getting the same errors you described, but now ive run it about 10
> times without error. Cranked it up to 8 producers and consumers and it
> still gave no errors. Yay!
>
> I really see now why it's so important to have a test for code like this.
> I could have checked over it 100 times and never spotted that error.
I think I spotted a typo in line `210':
http://cpt.pastebin.com/f39deca30
if (CmpSwap8B<QPtr>(&head, hhh, QPtr(next, head.tag+1)))
should be:
if (CmpSwap8B<QPtr>(&head, hhh, QPtr(next, hhh.tag+1)))
I also spotted a stupid error I made. I should have decorated the following
functions:
void* producers(void* state);
void* consumers(void* state);
with extern "C" because they are called from within the PThread library!
> So, how thorough is your test code?
Not that thorough.
> Is it enough to test if an algorithm is reliable?
No.
> I mean would you crank up the number of consumers/producers and run it for
> a couple of hours?
Yes. This does not prove anything, but IMHO it always good to bash the
implementation with a sustained heavy load just to see how it handles the
pressure.
> Or are the numbers you have in there enough?
I would increase the number of threads to 32, crank up the number of
iterations and run the test application 10 or 20 times and give each one a
random priority. Or, you can learn how to use Relacy Race Detector:
http://groups.google.com/group/relacy/web
It simulates various memory models and synchronization primitives within a
hardcore thread interleaving environment; it basically beats the shi% out of
the algorithm you are testing.
> And thanks for helping/teaching me. I appreciate it.
No problem.
:^)
Don't forget the simple lock-free queue out of a stack technique:
<typed in news reader>
__________________________________________________________
struct stack
{
struct stack* next;
};
void
stack_push(struct stack* volatile* phead,
struct stack* node)
{
MEMBAR #LoadStore | #StoreStore;
node->next = NULL;
struct stack* head = *phead;
do
{
node->next = head;
}
while (! ATOMIC_CAS(phead, node));
}
struct stack*
stack_flush_ex(struct stack* volatile* phead,
struct stack* node)
{
struct stack* prev;
if (node)
{
MEMBAR #LoadStore | #StoreStore;
}
prev = ATOMIC_SWAP(phead, node);
MEMBAR #LoadStore | #LoadLoad;
return prev;
}
#define stack_flush(phead) \
stack_flush_ex((phead), NULL)
struct stack*
stack_reverse(struct stack* head)
{
struct stack* rhead = NULL;
while (head)
{
struct stack* next = head->next;
head->next = rhead;
rhead = head;
head = next;
}
return rhead;
}
#define stack_queue_pop(phead) \
stack_reverse(stack_flush((phead)))
__________________________________________________________
;^)
If you don't mind using an efficient "blocking" algorithm like this:
http://relacy.pastebin.com/fd4b6e1a
http://groups.google.com/group/lock-free/browse_thread/thread/c8e3201da4a6a300
then you can get away with wait-free semantics for producer threads, and
efficient "blocking" behavior for consumers. It's nice because it only has 1
atomic RMW on the producer and 1 for the consumer. The algorithm is very
simple, but it has it's tradeoffs.
> If you don't mind using an efficient "blocking" algorithm like this:
>
>
> http://relacy.pastebin.com/fd4b6e1a
>
> http://groups.google.com/group/lock-free/browse_thread/thread/c8e3201da4a6a300
>
>
> then you can get away with wait-free semantics for producer threads, and
> efficient "blocking" behavior for consumers. It's nice because it only has
> 1 atomic RMW on the producer and 1 for the consumer. The algorithm is very
> simple, but it has it's tradeoffs.
They look perfect for what I need. I had actualy dug up those posts via
google and was intending on getting round to trying them. I just wanted to
see if my idea with the M&S queue would work first.
But those certainly look much better for what I want, easier to understand
and more efficient. The procucer part is particulary good. It's so simple it
should really be more obvious. I mean I read a whole bunch of lock free
papers online and dont remember seeing that particular method.
Yeah I intend to have a try at using it. Tbh i found it a little bit
daunting when I first downloaded it, i'm not sure I understand this stuff
well enough to use it yet.
thanks
AFAICT, Dmitriy Vyukov invented and presented it here. I applied DWCAS to
consumer side. Dmitriy protected it with SEH. If you can handle the caveats,
it works great.
;^)
Be sure to use an eventcount for blocking semantics. Original algorithm
posted to this group:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/aa8c62ad06dbb380
(read all)
I mean, block when it's necessary. Define and enforce boundary conditions
(e.g., full/empty).
;^)