Portable eventcount

249 views
Skip to first unread message

Dmitriy V'jukov

unread,
Sep 12, 2008, 2:24:39 PM9/12/08
to Scalable Synchronization Algorithms
This eventcount algorithm is based on semaphore, so it can be easily
implemented in POSIX and/or Windows. Implementation is mostly lock-
free. Wait generations are maintained manually.

class eventcount
{
public:
typedef uint32_t state_t;

eventcount()
{
state_ = waiters_offset;
sema_ = CreateSemaphoreW(0, 0, LONG_MAX, 0);
if (0 == sema_)
throw std::runtime_error("error creating semaphore");
}

~eventcount()
{
CloseHandle(sema_);
}

state_t prepare_wait()
{
return _InterlockedExchangeAdd((long*)&state_, waiters_inc);
}

void retire_wait()
{
_InterlockedExchangeAdd((long*)&state_, -(int)waiters_inc);
}

void wait(state_t cmp)
{
for (;;)
{
state_t cmp0 = state_;
if ((cmp & generation_mask) != (cmp0 & generation_mask))
{
_InterlockedExchangeAdd((long*)&state_, -
(int)waiters_inc);
return;
}
WaitForSingleObject(sema_, INFINITE);
cmp0 = state_;
if ((cmp & generation_mask) != (cmp0 & generation_mask))
return;
ReleaseSemaphore(sema_, 1, 0);
Sleep(0);
}
}

void signal()
{
_mm_mfence();
signal_relaxed();
}

void signal_relaxed()
{
state_t cmp = state_;
int waiters = (int)(cmp & waiters_mask) - (int)waiters_offset;
if (waiters <= 0)
return;
for (;;)
{
state_t xchg = (cmp & ~waiters_mask) + generation_inc +
waiters_offset;
state_t cmp0 = _InterlockedCompareExchange((long*)&state_,
xchg, cmp);
if (cmp0 == cmp)
{
ReleaseSemaphore(sema_, waiters, 0);
return;
}
cmp = cmp0;
waiters = (int)(cmp & waiters_mask) - (int)waiters_offset;
if (waiters <= 0)
return;
}
}

private:
state_t volatile state_;
HANDLE sema_;

static state_t const waiters_inc = 1;
static state_t const waiters_mask = (1 << 20) - 1;
static state_t const waiters_offset = 1 << 19;
static state_t const generation_inc = 1 << 20;
static state_t const generation_mask = ~waiters_mask;

eventcount(eventcount const&);
eventcount& operator = (eventcount const&);

};

************************************************************

Here is a little helper class for blocking threads and usage example:

class eventcount_blocking
{
public:
eventcount_blocking(eventcount& ec)
: ec_(ec)
{
cmp_ = ec_.prepare_wait();
wait_ = false;
}

void wait()
{
assert(false == wait_);
wait_ = true;
ec_.wait(cmp_);
}

~eventcount_blocking()
{
if (false == wait_)
ec_.retire_wait();
}

private:
eventcount& ec_;
eventcount::state_t cmp_;
bool wait_;

eventcount_blocking(eventcount_blocking const&);
eventcount_blocking& operator = (eventcount_blocking const&);

};

************************************************************

Usage example:

queue q;
eventcount ec;

void producer(void* data)
{
q.enqueue(data);
ec.signal(); // or signal_relaxed()

}

void* consumer()
{
void* data = 0;
if (data = q.dequeue())
return data;
for (;;)
{
eventcount_blocking block (ec);
if (data = q.dequeue())
return data;
block.wait();
if (data = q.dequeue())
return data;
}

}

Dmitriy V'jukov

cybice

unread,
Apr 26, 2011, 12:18:54 AM4/26/11
to lock...@googlegroups.com
It seems that single writing thread and single reading thread 
makes this algorithm never get out from for(;;) cycle in wait function


for simplicity lets look on state_ like 
state_ = statepart | (generation_part<<somebits)  


thread1:
calls preparewait : 
statepart=statepart + 1

-------------------------

thread2 calls signal()
and after this code 
state_t xchg = (cmp & ~waiters_mask) + generation_inc + waiters_offset; 
state_t cmp0 = _InterlockedCompareExchange((long*)&state_, xchg, cmp); 

statepart = 0 and generationpart = +1

------------------------

thread1:
call wait and after 
_InterlockedExchangeAdd((long*)&state_, - (int)waiters_inc); 
we get statepart = -1

------------------------

now 
thread1: 
calls preparewait and wait 
in for(;;) cycle at wait we get
cmp = -1
cmp0 = 0

and any calls from thread2 to signal returns because of 
int waiters = (int)(cmp & waiters_mask) - (int)waiters_offset; 
if (waiters <= 0) 
return; 


so thread1 never get out from wait for(;;) cycle



Dmitriy Vyukov

unread,
Apr 26, 2011, 4:56:05 AM4/26/11
to Scalable Synchronization Algorithms
Aha! I see the problem. Indeed if statepart == -1, then a waiter can
constantly spin in wait() - consume a semaphore count, see no
generation change, release a semaphore count, and so on.

It seems that this variant of eventcount is not "all that good" in
several aspects:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/9461b41709e4063a

For your particular problem you may try the following algorithm:

class eventcount

{

public:

eventcount(HANDLE g_sema, unsigned volatile* waiter_count)

{

waiter_count_ = waiter_count;

sema_ = g_sema;

}



void prepare_wait()

{

_InterlockedIncrement(waiter_count_);

}

void retire_wait()
{
_InterlockedDecrement(waiter_count_);

}



void wait()

{

WaitForSingleObject(sema_, INFINITE);

}



void signal()

{

_mm_mfence();

signal_relaxed();

}



void signal_relaxed()

{

long wc = *waiter_count_;
for (;;)

{
if (wc <= 0)

return;

unsigned wc0 = _InterlockedCompareExchange(waiter_count_, wc - 1,
wc);

if (wc0 == wc)

{

ReleaseSemaphore(sema_, 1, 0);

return;

}

wc = wc0;

}

}



private:

long volatile* waiter_count_;

HANDLE sema_;

};



I've simplified it by completely removing "wait generations". Since in
your problem all waiters are identical (it's OK to wakeup any waiter),
it should not be a problem.
CAUTION: the algorithm is untested and typed in notepad.


--
Dmitry Vyukov
Reply all
Reply to author
Forward
0 new messages