A fast concurrent FIFO Queue

292 views
Skip to first unread message

Amine Moulay Ramdane

unread,
Apr 15, 2015, 6:01:08 PM4/15/15
to lock...@googlegroups.com



Hello,


A  fast concurrent FIFO Queue version 1.3



Authors: Amine Moulay Ramdane


Description:

A very fast concurrent FIFO queue that satisfies many requirements: it has more parallelism than the two locks algorithm, it is FIFO fair , it's starvation-free and it minimizes efficiently the cache-coherence traffic and it is energy efficient on the pop() side when you set the wait parameter to true in the construtor: when there is no items in the queue it will not spin-wait , but it will block wait on my SemaMonitor, and when the wait parameter of the constructor is set to false it uses only an atomic increment on the push() side and an atomic increment on the pop() side, so it's very fast. The number of threads on the pop() side are limited by the length of the queue, the length of the queue must be greater or equal to 2^10, i have set it like that.

You have 3 options for setting the kind of locks, just look inside defines.inc , if you want to set it for my scalable lock called scalable MLock just uncomment the option MLock inside defines.inc, if you want to set it for Ticket Spinlock just uncomment the option TicketSpinlock ,If you want to set it for Spinlock just uncomment the option Spinlock, the Ticket Spinlock option scored 12.5 millions of transactions per second on my 2.4 GHz Quadcore.

The size of the queue must be passed to the constructor and it must be a power of 2.

Here is my explanation of my algorithm, you will find the source code here:

https://sites.google.com/site/aminer68/concurrent-fifo-queue-1


We begin by the push() method, its source code look like this:

===

function TWQueue.push(tm : long):boolean;
var lastHead,newtemp:long;
k:integer;
begin
result:=true;

newTemp:=LockedIncLong(head);
lastHead:=newtemp-1;


while getlength1(newtemp) > fsize
do {$IFDEF FPC}
ThreadSwitch;
{$ENDIF}
{$IFDEF Delphi}
sleep(0);
{$ENDIF}

repeat
if fcount1^[lastHead and fMask].flag1=0 then break;
{$IFDEF FPC}
ThreadSwitch;
{$ENDIF}
{$IFDEF Delphi}
sleep(0);
{$ENDIF}
until false;
setObject(lastHead,tm);
fcount1^[lastHead and fMask].flag1:=1;

end;
===

Now we look at the rest of the code...

Every cell of the array based queue look like this:

type cell = record
obj:long;
flag1:long;
flag2:long;
{$IFDEF CPU32}
cache:typecache2;
{$ENDIF CPU32}
{$IFDEF CPU64}
cache:typecache3;
{$ENDIF CPU64}
end;

It's cache padded to a cache-line size and the array is aligned on 64 bytes so that to avoid false-sharing...

When the thread enters the push() method it will increment the "head" like this with an atomic increment like this:

newTemp:=LockedIncLong(head);
lastHead:=newtemp-1;

after that the thread will test if "getlength1(newtemp) > fsize", that means if we have not gone beyong the length of the queue , if we have gone beyong the length of the queue we will spin-wait, if we have not gone beyong the length of the queue the thread will continue , and the thread will test with an if fcount1^[lastHead and fMask].flag1=0, that means it will test if there is no item in the cell , if fcount1^[lastHead and fMask].flag1 is not equal to 0 we will spin-wait , if fcount1^[lastHead and fMask].flag1 is equal to zero that means there is no item in the cell so we will write the item on the cell by doing this:

setObject(lastHead,tm);

and after that we set the flag of the cell to 1 so that the pop()
can read from it when it's set to 1 like this:

fcount1^[lastHead and fMask].flag1:=1;

so as you have noticed i have reasonned and explained to you the push() side, and i think my reasonning is correct here.

Now here is the new pop() method...

==

function TWQueue.pop(var obj:long):boolean;

var lastTail,newtemp: long;
k:integer;
begin

result:=true;

newTemp:=LockedIncLong(temp);
lastTail:=newtemp-1;

repeat
if fcount1^[lastTail and fMask].flag1=1 then break;
{$IFDEF FPC}
ThreadSwitch;
{$ENDIF}
{$IFDEF Delphi}
sleep(0);
{$ENDIF}
until (false);

obj:=getObject(lastTail);

repeat
if fcount1^[lastTail and fMask].flag2=1 then break;
{$IFDEF FPC}
ThreadSwitch;
{$ENDIF}
{$IFDEF Delphi}
sleep(0);
{$ENDIF}
until (false);

fcount1^[lastTail and fMask].flag2:=0;
fcount1^[lastTail and fMask].flag1:=0;
tail:=newtemp;
fcount1^[newtemp and fMask].flag2:=1;

end;

==


So as you have noticed we have to reason about this algorithm to prove that all is correct, so follow with me please...

When the thread enters the pop() method they will atomically increment the "temp" variable , and after that the thread will wait that fcount1^[lastTail and fMask].flag1 is equal to 1, that means it will wait that an item is available on the cell of number "lastTail and fMask", and after that it will get the item from the cell , and after that the thread will wait on fcount1^[lastTail and fMask].flag2 to equal 1, than means it will wait for previous pop threads on lastTail-1 to set fcount1^[lastTail and fMask].flag2 to 1, and after that the thread will set the flag2 to 0 and after that the thread will set flag1 to 0 so that the push side will be able to put an item on the cell, but even though the thread sets flag1 to 0 before incrementing "Tail", the algorithm is correct since the push threads are limited on how much they can push by the length of the queue , and after that the thread will increment tail with "tail:=newtemp" and after that the thread will signal the next poping thread by setting flag2 to 1, so i think my algorithm is correct and efficient.

Note: The number of threads on the pop() side are limited by the length of the queue, the length of the queue must be greater or equal to 2^10, i have set it like that.

So as you have noticed i have reasonned about my algorithm an i think my algorithm is correct now.

Finally i have benchmarked my algorithm without using my SemaMonitor and it has scored 12.5 millions of transactions per second on my 2.4 GHz Quadcore, and it minimizes efficiently the cache-coherence traffic and it reduces the contention efficiently so that it can better scale with more and more cores.

You can download my new algorithm of a very fast concurrent FIFO queue from:

https://sites.google.com/site/aminer68/concurrent-fifo-queue-1



Thank you,
Amine Moulay Ramdane.






Amine Moulay Ramdane

unread,
Apr 16, 2015, 11:31:30 AM4/16/15
to lock...@googlegroups.com



I correct a word in english in my text: we say beyond , not beyong.
Reply all
Reply to author
Forward
0 new messages