Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

I have received the following question...

7 views
Skip to first unread message

aminer

unread,
May 13, 2013, 5:38:26 PM5/13/13
to
.

Hello,

I have received the following question:


>Hi all,

>I need a simple FIFO queue, which would allow the communication
>between two threads (producer - consumer).

>Please note that my model is: one producer - one consumer. One
>producer (main thread) builds an object, which it then pushes to the
>consumer (other thread), allowing itself to build the next object. The
>idea is that the producer and the consumer would operate in a pipeline.

>I have already looked at http://www.emadar.com/fpc/lockfree.htm , but
>this consumes a lot of cpu cycles in the consumer, because pop is non
>blocking on an empty queue.

>Any ideas?"


Hello,

Here it is:

http://pages.videotron.com/aminer/zip/lockfree.zip

I have enhanced Lockfree_mpmc to wait on an empty queue.

Here is what looks like the pop():

function pop(var obj:tNodeQueue;wait:boolean=false):boolean;

If you want to wait just call pop() by passing True to the wait argument.

If you look at the pop() method . the logic is correct , cause if for
example the consumer thread is on self.event.waitfor(INFINITE);

and it receives two items before entering the self.event.resetevent;
can it forget to process the second item cause the consumer thread
will reset the event but one item will still be on the queue ?

Answer:

No, it will still process correctly the second item cause we are catching
the number of items with the following code on the consumer side:

---
if self.count <> 0
then continue;

---

Look at: http://www.emadar.com/fpc/lockfree.htm

In the pop() side of this lockfree queue we have:
--------------

function gFLQueue.pop(var tm:_R):boolean;
var newhead, lastHead : integer;

begin
repeat
lastHead:=head;
if tail<>head
then
begin

pointer(newHead):=interlockedCompareExchange(pointer(head),pointer(lastHead
+1),pointer(lasthead));
if newHead=lastHead
then
begin
tm:=getObject(lastHead);
result:=true; exit;
end; end
else
begin
result:=false;
exit;
end;
until false;
end;
end.
----

If you have noticed, after he is incrementing head with an
interlockedCompareExchange(), he is doing a tm:=getObject(lastHead);
and this is an error i think, cause if the thread
is prempeted and another thread have put another value in the same
place as lasthead, the value willl be invalid and corrupted.
I have corrected this problem in my lockfree_mpmc fifo queue version
1.14

You will notice that i have modified the algorithm in the push()
side og Lockfree queue source code, and i have used a test like this:

if getlength >= fsize
then
begin
result:=false;
exit;
end;

Now i have tested my new algorithm and it is working very well.

Correctness:

To not make the correctness verification longer, i will concentrate
on
the more important parts. If you take a look at the lockfree_mpmc
source code , inside lockfree_mpmc.pas you will read this in push
side:

function TLockfree_MPMC.push(tm : tNodeQueue):boolean;
var lasttail,newtemp:longword;
begin
if getlength >= fsize
then
begin
result:=false;
exit;
end;
result:=true;
//newTemp:=windows.interlockedincrement(temp);
newTemp:=LockedIncLong(temp);

lastTail:=newTemp-1;
setObject(lastTail,tm);
repeat
if CAS(tail[0],lasttail,newtemp)
then
begin
exit;
end;
sleep(0);
until false;
end;

So, let's say the size of the bounded queue is 1000 , and imagine
that the threads are executing the "if getlength >= fsize " all at
the
same time, and imagine that the getlength returns 999, so, the
"if getlength >= fsize" will returns false , and since we have
fSize:=(1 shl aPower) - margin inside the constructor, with a margin
of 1000


(margin must be >= to the number of cores) , there will be no
problem(overflow..)...

Now in the pop side, you will read this:

function TLockfree_MPMC.pop(var obj:tNodeQueue):boolean;

var lastHead : longword;
begin
repeat
lastHead:=head[0];
if tail[0]<>head[0]
then
begin
obj:=getObject(lastHead);
if CAS(head[0],lasthead,lasthead+1)
then
begin
result:=true;
exit;
end;
end
else
begin
result:=false;
exit;
end;
until false;
end;

So, as you have noticed, there is a test like this:
if CAS(head[0],lasthead,lasthead+1) , and this test
avoids something like the ABA problem, cause if head
have changed , the CAS() will fail.

The Intel x86 memory model, detailed in Intel 64 Architecture Memory
Ordering White Paper
and the AMD spec, AMD64 Architecture Programmer's Manual, list a lot of
memory ordering
guarantees, among them:
Loads are not reordered with other loads.
Stores are not reordered with other stores.
Stores are not reordered with older loads.
In a multiprocessor system, memory ordering obeys causality (memory ordering
respects transitive visibility).
In a multiprocessor system, stores to the same location have a total order.
In a multiprocessor system, locked instructions have a total order.
Loads and stores are not reordered with locked instructions.

But since on x86 Loads may be reordered with older stores to different
locations

So please take a look at the following lockfree_mpmc code:

---

function TLockfree_MPMC.push(tm : tNodeQueue):boolean;
var lasttail,newtemp:long;
i,j:integer;
begin

if getlength >= fsize
then
begin
result:=false;
exit;
end;

result:=true;

[1]newTemp:=LockedIncLong(temp);

[2] lastTail:=newTemp-1;
setObject(lastTail,tm);

[3]

repeat

[4] if CAS(tail,lasttail,newtemp)
then
begin
exit;
end;
sleep(0);
until false;

end;
---


As you have noticed there is a load on
setObject(lastTail,tm) between line [2] and line [4].
And loads are not reordered with older loads on x86
So i think Lockfree_mpmc is correct.

Here is the source code:

http://pages.videotron.com/aminer/zip/lockfree.zip



Note also that my Lockfree queue Lockfree_mpmc is a multiple producer,
multiple producer lockfree FIFO queue.



Thank you,
Amine Moulay Ramdane.

aminer

unread,
May 13, 2013, 6:01:15 PM5/13/13
to
..

On 5/13/2013 2:39 PM, aminer wrote:> Here is the source code:
>
> http://pages.videotron.com/aminer/zip/lockfree.zip
>
>
>
> Note also that my Lockfree queue Lockfree_mpmc is a multiple producer,
> multiple producer lockfree FIFO queue.


I mean it is a multiple producer, multiple consumer lockfree FIFO queue.
And there is a margin value of 1000 threads, so you can not go more
than 1000 threads , but you can higher the margin value in the source
code if you want.
Thank you,
Amine Moulay Rmdne


0 new messages