Here is the code of my lock-free Threadpool and lockfree_SPMC
and my questions will follow:
{****************************************************************************
*
*
* Lock-free threadpool queue that use lockfree_spmc *
*
*
*
*
*
* Language: FPC Pascal v2.2.0+ / Delphi
5+ *
*
*
* Required switches:
none *
*
*
* Author: Amine Moulay
Ramdane *
*
*
*
*
* Date: July 8,
2009 *
* Version:
1.08 *
*
*
*
*
*
*
* Send bug reports and feedback to ami...@colba.net *
* You can always get the latest version/revision of this package
from *
*
*
* http://www.colba.net/~aminer
*
*
*
*
*
*
*
*
*
* This program is distributed in the hope that it will be useful,
*
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
*
*
*
*
***************************************************************************** }
unit ThreadPool;
interface
uses
Lockfreeutils,Lockfree_SPMC,
//hash,
{$IFDEF Delphi}classespatch,{$ENDIF}
{$IFDEF FreePascal}classes,{$ENDIF}
SysUtils;
const
LOW_PRIORITY = 0;
NORMAL_PRIORITY = 1;
HIGH_PRIORITY = 2;
type
THandle = LongWord;
TThreadPoolThread = class;
TThreadPoolThreadClass = class of TThreadPoolThread;
TPoolQueue = Array of TLockfree_SPMC;
TThreadPool = class (TObject)
private
WaitForEmptySystem1:boolean;
count:array of longword;
balance1,balance2,balance3:integer;
Queues:TPoolQueue;
FThreadCount: Integer;
FThreads: array of TThreadPoolThread;
protected
public
constructor Create(const ThreadCount:integer;const QueuesSize:
Integer; const ThreadClass: TThreadPoolThreadClass); //;const
eventhandle:THandle);
destructor Destroy; override;
function Execute(const Context: Pointer): Boolean; //stdcall;
function PushInFirstQueue(const Context: Pointer): Boolean;
function PushInSecondQueue(const Context: Pointer): Boolean;
procedure WaitForEmptyQueues;
procedure WaitForEmptySystem;
procedure Suspend;//stdcall;
procedure Resume;//stdcall;
procedure Terminate;//stdcall;
end;
TThreadPoolThread = class (TThread)
private
threadcount:integer;
FThreadPool: TThreadPool;
protected
procedure Execute; override;
public
constructor Create(const ThreadPool: TThreadPool;const
threadcount:integer);
procedure ProcessRequest(Context: Pointer);virtual; abstract; //
stdcall;
property ThreadPool: TThreadPool read FThreadPool;
end;
var eventhandle:Thandle;
implementation
{ TThreadPool }
constructor TThreadPool.Create(const ThreadCount:integer;const
QueuesSize: Integer; const ThreadClass:
TThreadPoolThreadClass);//;const eventhandle:THandle);
var
I: Integer;
begin
inherited Create;
//windows.InitializeCriticalSection(FLock);
balance1:=0;balance2:=0;balance3:=0;
//hashObj:=THash.create;
FThreadCount := ThreadCount;
WaitForEmptySystem1:=false;
SetLength(FThreads, FThreadCount);
SetLength(Queues, FThreadCount);
SetLength(Count, FThreadCount);
for I := 0 to FThreadCount - 1 do Count[I]:=0;
for I := 0 to FThreadCount - 1 do Queues[I]:=TLockfree_SPMC.create
(QueuesSize);
for I := 0 to FThreadCount - 1 do FThreads[I] := ThreadClass.Create
(Self,i);
end;
destructor TThreadPool.Destroy;
var
I: Integer;
begin
for I := 0 to FThreadCount - 1
do
begin
Queues[I].Free;
FThreads[I].Free;
end;
SetLength(FThreads, 0);
SetLength(Queues, 0);
SetLength(Count, 0);
//hashObj.free;
//windows.DeleteCriticalSection(FLock);
inherited Destroy;
end;
function TThreadPool.execute(const Context: Pointer): Boolean;
begin
if (balance1=FThreadCount) then balance1:=0;
while not Queues[balance1].push(TObject(context)) do;
inc(balance1);
end;
function TThreadPool.PushInFirstQueue(const Context: Pointer):
Boolean;
begin
while not Queues[0].push(TObject(context)) do;
//if (balance2=FThreadCount) then balance2:=0;
// while not Queues[balance2].push(TObject(context)) do;
//inc(balance2);
end;
function TThreadPool.PushInSecondQueue(const Context: Pointer):
Boolean;
begin
while not Queues[1].push(TObject(context)) do;
//if (balance3=FThreadCount) then balance3:=0;
// while not Queues[balance3].push(TObject(context)) do;
//inc(balance3);
end;
{function TThreadPool.PushOnQueue1(const Context:
Pointer;Priority:byte): Boolean;
var
balance:integer;
threadid:string;
begin
threadid:=IntToStr(windows.GetCurrentThreadId);
if hashObj.Get(threadid)=nil then hashObj.Add(threadid,pointer(0));
if integer(hashObj.Get(threadid))=FThreadCount
then
begin
hashObj.Delete(threadid);
hashObj.Add(threadid,pointer(0));
end;
balance:=integer(hashObj.Get(threadid));
while not Queues[balance].push(TObject(context)) do;
inc(balance);
hashObj.Delete(threadid);
hashObj.Add(threadid,pointer(balance));
end; }
procedure TThreadPool.WaitForEmptySystem;
var
i:integer;
empty:boolean;
begin
for i:=0 to self.Fthreadcount-1 do self.count[i]:=0;
self.WaitForEmptyQueues;
self.WaitForEmptySystem1:=true;
repeat
empty:=true;
for i:=0 to self.Fthreadcount-1
do empty := (empty and (self.count[i] >= 1));
if empty=true
then
begin
self.WaitForEmptySystem1:=false;
exit;
end;
until false;
end;
procedure TThreadPool.WaitForEmptyQueues;
var
count,i:integer;
begin
repeat
count:=0;
for I := 0 to FThreadCount - 1 do count:=count + Queues[I].length;
if count=0 then exit;
until false;
end;
procedure TThreadPool.Terminate;
var
I: Integer;
begin
// signal termination to all threads
// for I := 0 to FThreadCount - 1 do FThreads[I].Suspend;
for I := 0 to FThreadCount - 1 do FThreads[I].Terminate;
// and wait for them all to exit
for I := 0 to FThreadCount - 1 do FThreads[I].WaitFor;
end;
procedure TThreadPool.Resume;
var
I: Integer;
begin
for I := 0 to FThreadCount - 1 do FThreads[I].Suspended:=false;
end;
procedure TThreadPool.Suspend;
var
I: Integer;
begin
for I := 0 to FThreadCount - 1 do FThreads[I].Suspended:=true;
end;
{ TThreadPoolThread }
constructor TThreadPoolThread.Create(const ThreadPool:
TThreadPool;const threadcount:integer);
begin
inherited Create(true);
self.threadcount:=threadcount;
FThreadPool := ThreadPool;
Resume;
end;
procedure TThreadPoolThread.Execute;
var
Context: Tobject;
i:integer;
j:integer;
begin
//j:=0;
//windows.WaitForSingleObject
(FThreadPool.EventHandle,windows.INFINITE);
while not Terminated do
begin
if FThreadPool.Queues[self.threadcount].pop(context)
then
begin
//inc(j);
ProcessRequest(pointer(Context))
end
else
begin
if (FThreadpool.WaitForEmptySystem1 and (FThreadpool.count
[self.threadcount] < 1))
then inc(FThreadpool.count[self.threadcount]);
for i:=0 to FThreadpool.Fthreadcount-1
do
begin
if FThreadPool.Queues[i].pop(context)
then
begin
// inc(j);
ProcessRequest(pointer(Context))
end;
end;
end;
end;
//windows.EnterCriticalSection(FThreadpool.FLock);
//writeln('Number of pushes in ',self.threadcount,'is: ',j);
//windows.LeaveCriticalSection(FThreadpool.FLock);
end;
end.
-----------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------
{****************************************************************************
*
*
* free-lock
queue *
*
*
*
*
* Language: FPC Pascal v2.2.0+ / Delphi
6+ *
*
*
* Required switches:
none *
*
*
* Author: Amine Moulay
Ramdane *
*
*
*
*
* Date: July 7,
2009 *
* Version:
1.0 *
*
*
*
*
* Send bug reports and feedback to ami...@colba.net *
* You can always get the latest version/revision of this package
from *
*
*
* http://www.colba.net/~aminer
*
*
*
*
*
*
*
*
* Description: Lock-free SPMC algorithm to handle queue FIFO *
* proposed by Amine Moulay Ramadane
*
* use only single CAS on pop and no CAS on push *
* lockfree_SPMC: for queue of tObject
(pointer) *
*
*
* This program is distributed in the hope that it will be useful,
*
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
*
*
*
*
***************************************************************************** }
unit Lockfree_SPMC;
interface
uses lockfreeutils;
type
tNodeQueue = tObject;
typecache1 = array[0..15] of longword;
TLockfree_SPMC = class
private
tail,
head: typecache1;
fMask : integer;
fSize : integer;
tab : array of tNodeQueue;
procedure setobject(lp : longword;const aobject : tNodeQueue);
function getLength:longword;
function getSize:longword;
function getObject(lp : longword):tNodeQueue;
public
constructor create(aPower : integer =20); {allocate tab with
size equal 2^aPower, for 20 size is equal 1048576}
destructor Destroy;
function push(tm : tNodeQueue):boolean;
function pop(var obj:tNodeQueue):boolean;
property length : longword read getLength;
property size : longword read getSize;
end;
implementation
function LockedInc(var Target: Integer): Integer;
asm
MOV ECX, EAX
MOV EAX, 1
LOCK XADD [ECX], EAX
INC EAX
end;
function CAS(var Target: longword; Comperand: longword;NewValue:
longword ): boolean; assembler;stdcall;
asm
mov ecx,Target
mov edx,NewValue
mov eax,Comperand
lock cmpxchg [ecx],edx
JNZ @@2
MOV AL,01
JMP @@Exit
@@2:
XOR AL,AL
@@Exit:
end;
constructor TLockfree_SPMC.create(aPower : integer );
begin
fMask:=not($FFFFFFFF shl aPower);
fSize:=1 shl aPower;
setLength(tab,fSize);
tail[0]:=0;
head[0]:=0;
end;
destructor TLockfree_SPMC.Destroy;
begin
inherited Destroy;
end;
procedure TLockfree_SPMC.setObject(lp : longword;const aobject :
tNodeQueue);
begin
tab[lp and fMask]:=aObject;
end;
function TLockfree_SPMC.getObject(lp : longword):tNodeQueue;
begin
result:=tab[lp and fMask];
end;
function TLockfree_SPMC.push(tm : tNodeQueue):boolean;
begin
result:=true;
if getlength >= fsize
then
begin
result:=false;
exit;
end;
setObject(Tail[0],tm);
inc(tail[0]);
end;
function TLockfree_SPMC.pop(var obj:tNodeQueue):boolean;
var
lastHead : longword;
begin
repeat
if tail[0]<>head[0]
then
begin
lastHead:=head[0];
obj:=getObject(integer(lastHead));
if CAS(head[0],lasthead,lasthead+1)
then
begin
result:=true;
exit;
end;
end
else
begin
result:=false;
exit;
end;
until false;
end;
function TLockfree_SPMC.getLength:longword;
begin
if tail[0] < head[0]
then result:= (high(longword)-head[0])+(1+tail[0])
else result:=(tail[0]-head[0]);
end;
function TLockfree_SPMC.getSize:longword;
begin
result:=fSize;
end;
end.
-----------------------------------------------------------------------------------
I was thinking about my lockfree threadpool and lockfree_SPMC,
and as you know lock-free does guaranty only global progress,
so, if we have let say more than 100 cores, i think it will be a
problem.. hence, what i am doing is using a queue for every worker
thread this will lower the contention and *guaranty* forward progress
for every worker thread.
But if you take a look carefully at the lockfree pop() in my
lockfree_SPMC
you will notice that in the work-stealing process i am doing this:
for i:=0 to FThreadpool.Fthreadcount-1
do
begin
if FThreadPool.Queues[i].pop(context)
then
begin
// inc(j);
ProcessRequest(pointer(Context))
end;
end;
-----------------------------------------------------------------------------------
and in the lockfree_SPMC pop() method i am using this
--------------------------------------------------------------------------------------
function TLockfree_SPMC.pop(var obj:tNodeQueue):boolean;
var
lastHead : longword;
begin
repeat
if tail[0]<>head[0]
then
begin
lastHead:=head[0];
obj:=getObject(integer(lastHead));
if CAS(head[0],lasthead,lasthead+1)
then
begin
result:=true;
exit;
end;
end
else
begin
result:=false;
exit;
end;
until false;
end;
-------------------------------------------------------------
So, if you have read carefully the algorithms above, you will come to
the
conclusion that if we are in a system with let say over 100 cores we
can
go through a state where we will have a very high contention in the
work-stealing
process, this will guaranty forward global progress but not guaranty
forward
progress for every worker thread in the work-stealing process. And
that's not good.
So, one solution is to loop a bound number of times in the
lockfree_SPMC pop()
method and not loop infinitly.
Have you any other ideads, suggestions ?
Thank you for your time.
Regards,
Amine Moulay Ramdane.
http://www.colba.net/~aminer/
On Jul 27, 10:10 pm, Amine <ami...@colba.net> wrote:
> Hello all,
>
> Here is the code of my lock-free Threadpool and lockfree_SPMC
> and my questions will follow:
>
> {****************************************************************************
> ***************************************************************************** }
> -----------------------------------------------------------------------------------------------------
>
> ----------------------------------------------------------------------------------------------------
> {****************************************************************************
> ***************************************************************************** }
> -----------------------------------------------------------------------------------
> So, if you have read carefully the algorithms above, you will come to
> the
> conclusion that if we are in a system with let say over 100 cores we
> can
> go through a state where we will have a very high contention in the
> work-stealing
> process, this will guaranty forward global progress but not guaranty
> forward
> progress for every worker thread in the work-stealing process. And
> that's not good.
>
> So, one solution is to loop a bound number of times in the
> lockfree_SPMC pop()
> method and not loop infinitly.
>
> Have you any other ideads, suggestions ?
Hmmm... Leave as is.
IMHO distributed queues + lock-free guarantees is enough to reduce
contention to acceptable level. Thread fails IFF some other thread
succeeds, and if thread succeeds than he retires with significant
amount of work for some time.
--
Dmitriy V'jukov
> Hello all,
>
> I was thinking about my lockfree threadpool and lockfree_SPMC,
> and as you know lock-free does guaranty only global progress,
> so, if we have let say more than 100 cores, i think it will be a
> problem.. hence, what i am doing is using a queue for every worker
> thread this will lower the contention and *guaranty* forward progress
> for every worker thread.
> So, if you have read carefully the algorithms above, you will come to
> the
> conclusion that if we are in a system with let say over 100 cores we
> can
> go through a state where we will have a very high contention in the
> work-stealing
> process, this will guaranty forward global progress but not guaranty
> forward
> progress for every worker thread in the work-stealing process. And
> that's not good.
>
> So, one solution is to loop a bound number of times in the
> lockfree_SPMC pop()
> method and not loop infinitly.
>
> Have you any other ideads, suggestions ?
Ideally, stealing operations would not be all that frequent. I am not sure
how Clik performs its stealing operation. For instance, do they allow a
thief to attempt to steal from all workers, or do they allow a thief to
attempt to steal from a single random worker and if that fails, then it
waits for work to be explicitly queued to it.