Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

A question about my fifoqueue_mpmc ...

13 views
Skip to first unread message

aminer

unread,
Dec 13, 2011, 2:19:22 PM12/13/11
to

Hello,

I have a question, please look at the following code i am using two
spinlocks
wit an exponential backoff on the push and the pop side:

Here is the code and my question follows::

---
unit FIFOQUEUE_MPMC;


interface

uses
syncobjs,expbackoff,sysutils;


{$Define CPU32}
type

tNodeQueue = tObject;
typecache1 = array[0..15] of longword;

// TLockfree_MPMC = class(TFreelist)
TFIFOQUEUE_MPMC = class
private
tail:longword;
tmp1:typecache1;

head: longword;
fMask : longword;
fSize : longword;
temp:longword;
tab : array of tNodeQueue;
backoff1,backoff2:texpbackoff;
count1,count2:longword;
syncobj:TCriticalSection;
procedure setobject(lp : longword;const aobject : tNodeQueue);
function getLength:longword;
function getSize:longword;
function getObject(lp : longword):tNodeQueue;
public
constructor create(aPower : integer =20); {allocate tab with
size equal 2^aPower, for 20 size is equal 1048576}
destructor Destroy; override;
function push(tm : tNodeQueue):boolean;
function pop(var obj:tNodeQueue):boolean;
property length : longword read getLength;
property count: longword read getLength;
property size : longword read getSize;

end;


implementation


function LockedIncLong(var Target: longword): Integer;
asm
{$IFDEF CPU32}
// --> EAX Target
// <-- EAX Result
MOV ECX, EAX
MOV EAX, 1
//sfence
LOCK XADD [ECX], EAX
INC EAX
{$ENDIF CPU32}
{$IFDEF CPU64}
// --> RCX Target
// <-- EAX Result
MOV EAX, 1
// sfence
LOCK XADD [RCX], EAX
INC EAX
{$ENDIF CPU64}
end;

function CAS(var Target: longword; Comperand: longword;NewValue:
longword ): boolean; assembler;stdcall;
asm
mov ecx,Target
mov edx,NewValue
mov eax,Comperand
//sfence
lock cmpxchg [ecx],edx
JNZ @@2
MOV AL,01
JMP @@Exit
@@2:
XOR AL,AL
@@Exit:
end;

constructor TFIFOQUEUE_MPMC.create(aPower : integer );
begin
if (aPower < 0) or (aPower > high(integer))
then
begin
writeln('Constructor''s argument incorrect');
exit;
end;
fMask:=not($FFFFFFFF shl aPower);
fSize:=(1 shl aPower);
setLength(tab,1 shl aPower);
tail:=0;
head:=0;
temp:=0;
backoff1:=texpbackoff.create(1,2);
backoff2:=texpbackoff.create(1,2);
count1:=0;
count2:=0;
syncobj:=TCriticalSection.create;

end;

destructor TFIFOQUEUE_MPMC.Destroy;

begin
setLength(tab,0);
backoff1.free;
backoff2.free;
syncobj.free;

inherited Destroy;
end;


procedure TFIFOQUEUE_MPMC.setObject(lp : longword;const aobject :
tNodeQueue);
begin
tab[lp]:=aObject;
end;

function TFIFOQUEUE_MPMC.getObject(lp : longword):tNodeQueue;
begin
result:=tab[lp];
end;


function TFIFOQUEUE_MPMC.push(tm : tNodeQueue):boolean;//stdcall;
var lasttail,newtemp:longword;
begin

result:=true;
repeat
if count1>1 then begin backoff1.delay; continue; end;

if LockedIncLong(count1) > 1
then
begin
backoff1.delay;
continue;
end
else;
if getlength >= fsize
then
begin
result:=false;
count1:=0;
exit;
end;

setObject(tail,tm);
tail:=(tail+1) and fmask;
count1:=0;
exit;

until false;
end;


function TFIFOQUEUE_MPMC.pop(var obj:tNodeQueue):boolean;

var lastHead : longword;
begin
repeat
if count2>1 then begin backoff2.delay; continue; end;

if LockedIncLong(count2) > 1
then
begin
backoff2.delay;
continue;
end
else;

if tail<>head
then
begin
obj:=getObject(head);
head:=(head+1) and fmask;
result:=true;
count2:=0;
exit;
end
else
begin
result:=false;
count2:=0;
exit;
end;
until false;
end;


function TFIFOQUEUE_MPMC.getLength:longword;
var head1,tail1:longword;
begin
head1:=head;
tail1:=tail;
if tail1 < head1
then result:= (High(longword)-head1)+(1+tail1)
else result:=(tail1-head1);
end;

function TFIFOQUEUE_MPMC.getSize:longword;

begin
result:=fSize;
end;

end.

---


As you have noticed i am not using the same spinlock for the push and
the pop ,
so my question is: am i correct to assume that: tail:=(tail+1) and
fmask; on the
push side and head :=(head+1) and fmask; on the pop side are all
atomic
and that there is no problem to use two different spinlocks one for
the push
and the other for the pop ?




Thank you,
Amine Moulay Ramdane.






aminer

unread,
Dec 13, 2011, 2:27:39 PM12/13/11
to

I wrote:
> As you have noticed i am not using the same spinlock for the push and
> the pop ,

Sorry, I am mean i am using two different spinlocks for the push and
the pop.

so my question is: am i correct to assume that: tail:=(tail+1) and
> fmask; on the
> push side and head :=(head+1) and fmask; on the pop side are all
> atomic
> and that there is no problem to use two different spinlocks one for
> the push
> and the other for the pop ?


Or must i use the same spinlock ? i think tail:=(tail+1) and
fmask; on the push side and head :=(head+1) and fmask; on
the pop side are updated atomicly, am i correct to assume
that there is no problem to use two different spinlocks.



Thank you.

Amine Moulay Ramdane.

aminer

unread,
Dec 13, 2011, 2:47:06 PM12/13/11
to

Hello again,

i mean am i correct to assume that tail:=(tail+1) and fmask; on the
push side and head :=(head+1) and fmask; on the pop side are updated
atomicly and that there is no problem to use two spinlocks one for
the
push and the other for the pop.



Amine Mmoulay Ramdane..

aminer

unread,
Dec 13, 2011, 2:51:56 PM12/13/11
to

Hello,


Can you tell me if there any problem with my fifoqueue_mpmc ?



Amine Moulay Ramdane

Chris M. Thomasson

unread,
Dec 13, 2011, 8:09:22 PM12/13/11
to
"aminer" <amin...@gmail.com> wrote in message
news:d73b1b3c-0705-49ee...@j10g2000vbe.googlegroups.com...

> Can you tell me if there any problem with my fifoqueue_mpmc ?

You should probably test it with Relacy:

http://www.1024cores.net/home/relacy-race-detector

AFAICT, you basically have a SPSC queue and that's fine. Anyway, you can
throw two locks around basically any single-producer/single-consumer queue
and get a MPMC out of it. FWIW, check out the following algorithm:


pseudo-code for a mpmc bounded queue of doubles:
______________________________________________
struct cell { uint32_t ver; double state; };

uint32_t head = 0;
uint32_t tail = 0;
cell cells[N]; // N must be a power of 2

void init() {
for (uint32_t i = 0; i < N; ++i) cells[i].ver = i;
}

void producer(double state) {
uint32_t ver = XADD(&head, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver) backoff();
c.state = state;
STORE(&c.ver, ver + 1);
}

double consumer() {
uint32_t ver = XADD(&tail, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver + 1) backoff();
double state = c.state;
STORE(&c.ver, ver + N);
return state;
}
______________________________________________


AFAICT, this is one of the more efficient algorithms out there...


aminer

unread,
Dec 13, 2011, 8:38:09 PM12/13/11
to

I know that you know a little bit of delphi pascal...

So, can you please translate it to the Delphi pascal language or
Freepascal
and give it to us the Delphi and freepascal users ?



Amine Moulay Ramdane.




On Dec 13, 8:09 pm, "Chris M. Thomasson" <cris...@charter.net> wrote:
> "aminer" <amine...@gmail.com> wrote in message

aminer

unread,
Dec 15, 2011, 12:06:08 PM12/15/11
to


If you have no time to translate it to delphi, i will try to do it
myself,
but please can you clarify more thie following on the consumer side
you have:

uint32_t ver = XADD(&tail, 1);
cell& c = cells[ver & (N - 1)];

are both ver and c local variables ?


also how are you avoiding buffer overflow ?


Can you explain more your algorithm to me so that i can translate it
correctly
to delphi and freepascal ?



Amine Moulay Ramdane.
> > AFAICT, this is one of the more efficient algorithms out there...- Hide quoted text -
>
> - Show quoted text -

0 new messages