Hello,
I have thought more, and i have come with a solution
for a concurrent FIFO queue that satisfies many requirements, it
minimizes efficiently the cache-coherence traffic, it is
FIFO fair(it uses FIFO fair locks) and it is energy efficient on the
pop() side: it will not spin-wait when there no items in the queue but
it will wait on a manual reset event object, and this is energy efficient.
I have benchmarked it and it gives 1.65 millions pop()
per second and 0.8 million push() per second.
Here is the FIFO queue, i will soon put it on my website.
{****************************************************************************
*
*
* FIFO MPMC Queue
*
*
*
*
*
* Language: FPC Pascal v2.2.0+ / Delphi 5+
*
*
*
* Required switches: none
*
*
*
* Authors: Amine Moulay Ramdane
*
*
*
*
*
*
*
*
* Version: 1.0
*
*
* Send bug reports and feedback to aminer @@ videotron @@ ca
*
* You can always get the latest version/revision of this package from
*
*
*
*
http://pages.videotron.com/aminer/
*
*
*
* Description: Algorithm to handle an FIFO MPMC queue
*
*
*
* This program is distributed in the hope that it will be useful,
*
* but WITHOUT ANY WARRANTY; without even the implied warranty of
*
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
*
*
*
*
*****************************************************************************
* BEGIN LICENSE BLOCK
*
{ changelog
v.1.0
}
unit FIFOQUEUE_MPMC;
interface
{$IFDEF FPC}
{$ASMMODE intel}
{$ENDIF}
uses
LW_ALOCK,SpinLock,SemaCondvar,msync,syncobjs,sysutils;
{$I defines.inc}
type
{$IFDEF CPU64}
long = qword;
{$ENDIF CPU64}
{$IFDEF CPU32}
long = longword;
{$ENDIF CPU32}
tNodeQueue = tObject;
typecache1 = array[0..15] of longword;
// TLockfree_MPMC = class(TFreelist)
TFIFOQUEUE_MPMC = class
private
tail:longword;
tmp1:typecache1;
head: long;
fMask : long;
fSize : long;
tab : array of tNodeQueue;
lock1,lock2:TALOCK;
event:TSimpleEvent;
lock3:TSpinlock;
count1:long;
procedure setobject(lp : long;const aobject : tNodeQueue);
function getLength:long;
function getSize:long;
function getObject(lp : long):tNodeQueue;
public
constructor create(aPower : long =20); {allocate tab with size
equal 2^aPower, for 20 size is equal 1048576}
destructor Destroy; override;
function push(tm : tNodeQueue):boolean;
function pop(var obj:tNodeQueue):boolean;
property length : long read getLength;
property count: long read getLength;
property size : long read getSize;
end;
implementation
{$IF defined(CPU64) }
function LockedCompareExchange(CompareVal, NewVal: long; var Target:
long): long; overload;
asm
mov rax, rcx
lock cmpxchg [r8], rdx
end;
{$IFEND}
{$IF defined(CPU32) }
function LockedCompareExchange(CompareVal, NewVal: long; var
Target:long): long; overload;
asm
lock cmpxchg [ecx], edx
end;
{$IFEND}
function CAS(var Target:long;Comp ,Exch : long): boolean;
var ret:long;
begin
ret:=LockedCompareExchange(Comp,Exch,Target);
if ret=comp
then result:=true
else result:=false;
end; { CAS }
function LockedIncLong(var Target: long): long;
asm
{$IFDEF CPU32}
// --> EAX Target
// <-- EAX Result
MOV ECX, EAX
MOV EAX, 1
//sfence
LOCK XADD [ECX], EAX
inc eax
{$ENDIF CPU32}
{$IFDEF CPU64}
// --> RCX Target
// <-- EAX Result
MOV rax, 1
//sfence
LOCK XADD [rcx], rax
INC rax
{$ENDIF CPU64}
end;
function LockedDecLong(var Target: long): long;
asm
{$IFDEF CPU32}
// --> EAX Target
// <-- EAX Result
MOV ECX, EAX
MOV EAX, -1
//sfence
LOCK XADD [ECX], EAX
dec eax
{$ENDIF CPU32}
{$IFDEF CPU64}
// --> RCX Target
// <-- EAX Result
MOV rax, -1
//sfence
LOCK XADD [rcx], rax
dec rax
{$ENDIF CPU64}
end;
constructor TFIFOQUEUE_MPMC.create(aPower : long );
begin
if (aPower < 0) or (aPower > high(long))
then
begin
writeln('Constructor''s argument incorrect');
halt;
end;
{$IFDEF CPU64}
fMask:=not($FFFFFFFFFFFFFFFF shl aPower);
{$ENDIF CPU64}
{$IFDEF CPU32}
fMask:=not($FFFFFFFF shl aPower);
{$ENDIF CPU32}
fSize:=(1 shl aPower);
setLength(tab,1 shl aPower);
tail:=0;
head:=0;
lock1:=TALOCK.create(100);
lock2:=TALOCK.create(100);
lock3:=TSpinlock.create;
event:=TSimpleEvent.create;
count1:=0;
end;
destructor TFIFOQUEUE_MPMC.Destroy;
begin
lock1.free;
lock2.free;
lock3.free;
event.free;
setLength(tab,0);
inherited Destroy;
end;
procedure TFIFOQUEUE_MPMC.setObject(lp : long;const aobject : tNodeQueue);
begin
tab[lp and fMask]:=aObject;
end;
function TFIFOQUEUE_MPMC.getObject(lp : long):tNodeQueue;
begin
result:=tab[lp and fMask];
end;
function TFIFOQUEUE_MPMC.push(tm : tNodeQueue):boolean;//stdcall;
begin
lock1.enter;
result:=true;
if getlength >= fsize
then
begin
result:=false;
lock1.leave;
exit;
end;
setObject(tail,tm);
tail:=(tail+1);
lock3.enter;
inc(count1);
event.setevent;
lock3.leave;
lock1.leave;
end;
function TFIFOQUEUE_MPMC.pop(var obj:tNodeQueue):boolean;
var b:long;
begin
if self.count=0
then
begin
event.waitfor(INFINITE);
lock3.enter;
if count1=0 then event.resetevent;
lock3.leave;
end;
lock2.enter;
if tail<>head
then
begin
obj:=getObject(head);
head:=(head+1);
result:=true;
lock3.enter;
dec(count1);
lock3.leave;
lock2.leave;
exit;
end
else
begin
result:=false;
lock2.leave;
end;
end;
function TFIFOQUEUE_MPMC.getLength:long;
var head1,tail1:long;
begin
head1:=head;
tail1:=tail;
if tail1 < head1
then result:= (High(long)-head1)+(1+tail1)
else result:=(tail1-head1);
end;
function TFIFOQUEUE_MPMC.getSize:long;
begin
result:=fSize;
end;
end.