.
Hello,
I have received the following question:
>Hi all,
>I need a simple FIFO queue, which would allow the communication
>between two threads (producer - consumer).
>Please note that my model is: one producer - one consumer. One
>producer (main thread) builds an object, which it then pushes to the
>consumer (other thread), allowing itself to build the next object. The
>idea is that the producer and the consumer would operate in a pipeline.
>I have already looked at
http://www.emadar.com/fpc/lockfree.htm , but
>this consumes a lot of cpu cycles in the consumer, because pop is non
>blocking on an empty queue.
>Any ideas?"
Hello,
Here it is:
http://pages.videotron.com/aminer/zip/lockfree.zip
I have enhanced Lockfree_mpmc to wait on an empty queue.
Here is what looks like the pop():
function pop(var obj:tNodeQueue;wait:boolean=false):boolean;
If you want to wait just call pop() by passing True to the wait argument.
If you look at the pop() method . the logic is correct , cause if for
example the consumer thread is on self.event.waitfor(INFINITE);
and it receives two items before entering the self.event.resetevent;
can it forget to process the second item cause the consumer thread
will reset the event but one item will still be on the queue ?
Answer:
No, it will still process correctly the second item cause we are catching
the number of items with the following code on the consumer side:
---
if self.count <> 0
then continue;
---
Look at:
http://www.emadar.com/fpc/lockfree.htm
In the pop() side of this lockfree queue we have:
--------------
function gFLQueue.pop(var tm:_R):boolean;
var newhead, lastHead : integer;
begin
repeat
lastHead:=head;
if tail<>head
then
begin
pointer(newHead):=interlockedCompareExchange(pointer(head),pointer(lastHead
+1),pointer(lasthead));
if newHead=lastHead
then
begin
tm:=getObject(lastHead);
result:=true; exit;
end; end
else
begin
result:=false;
exit;
end;
until false;
end;
end.
----
If you have noticed, after he is incrementing head with an
interlockedCompareExchange(), he is doing a tm:=getObject(lastHead);
and this is an error i think, cause if the thread
is prempeted and another thread have put another value in the same
place as lasthead, the value willl be invalid and corrupted.
I have corrected this problem in my lockfree_mpmc fifo queue version
1.14
You will notice that i have modified the algorithm in the push()
side og Lockfree queue source code, and i have used a test like this:
if getlength >= fsize
then
begin
result:=false;
exit;
end;
Now i have tested my new algorithm and it is working very well.
Correctness:
To not make the correctness verification longer, i will concentrate
on
the more important parts. If you take a look at the lockfree_mpmc
source code , inside lockfree_mpmc.pas you will read this in push
side:
function TLockfree_MPMC.push(tm : tNodeQueue):boolean;
var lasttail,newtemp:longword;
begin
if getlength >= fsize
then
begin
result:=false;
exit;
end;
result:=true;
//newTemp:=windows.interlockedincrement(temp);
newTemp:=LockedIncLong(temp);
lastTail:=newTemp-1;
setObject(lastTail,tm);
repeat
if CAS(tail[0],lasttail,newtemp)
then
begin
exit;
end;
sleep(0);
until false;
end;
So, let's say the size of the bounded queue is 1000 , and imagine
that the threads are executing the "if getlength >= fsize " all at
the
same time, and imagine that the getlength returns 999, so, the
"if getlength >= fsize" will returns false , and since we have
fSize:=(1 shl aPower) - margin inside the constructor, with a margin
of 1000
(margin must be >= to the number of cores) , there will be no
problem(overflow..)...
Now in the pop side, you will read this:
function TLockfree_MPMC.pop(var obj:tNodeQueue):boolean;
var lastHead : longword;
begin
repeat
lastHead:=head[0];
if tail[0]<>head[0]
then
begin
obj:=getObject(lastHead);
if CAS(head[0],lasthead,lasthead+1)
then
begin
result:=true;
exit;
end;
end
else
begin
result:=false;
exit;
end;
until false;
end;
So, as you have noticed, there is a test like this:
if CAS(head[0],lasthead,lasthead+1) , and this test
avoids something like the ABA problem, cause if head
have changed , the CAS() will fail.
The Intel x86 memory model, detailed in Intel 64 Architecture Memory
Ordering White Paper
and the AMD spec, AMD64 Architecture Programmer's Manual, list a lot of
memory ordering
guarantees, among them:
Loads are not reordered with other loads.
Stores are not reordered with other stores.
Stores are not reordered with older loads.
In a multiprocessor system, memory ordering obeys causality (memory ordering
respects transitive visibility).
In a multiprocessor system, stores to the same location have a total order.
In a multiprocessor system, locked instructions have a total order.
Loads and stores are not reordered with locked instructions.
But since on x86 Loads may be reordered with older stores to different
locations
So please take a look at the following lockfree_mpmc code:
---
function TLockfree_MPMC.push(tm : tNodeQueue):boolean;
var lasttail,newtemp:long;
i,j:integer;
begin
if getlength >= fsize
then
begin
result:=false;
exit;
end;
result:=true;
[1]newTemp:=LockedIncLong(temp);
[2] lastTail:=newTemp-1;
setObject(lastTail,tm);
[3]
repeat
[4] if CAS(tail,lasttail,newtemp)
then
begin
exit;
end;
sleep(0);
until false;
end;
---
As you have noticed there is a load on
setObject(lastTail,tm) between line [2] and line [4].
And loads are not reordered with older loads on x86
So i think Lockfree_mpmc is correct.
Here is the source code:
http://pages.videotron.com/aminer/zip/lockfree.zip
Note also that my Lockfree queue Lockfree_mpmc is a multiple producer,
multiple producer lockfree FIFO queue.
Thank you,
Amine Moulay Ramdane.