Portable eventcount (try 2)

87 views
Skip to first unread message

Dmitriy V'jukov

unread,
Sep 17, 2008, 4:30:03 PM9/17/08
to
I take into account feedback from Anthony Williams and Chris Thomasson
after my first try:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/9461b41709e4063a

Here is claimed properties:
1. No memory allocation/deallocation
2. No kernel object creation/destruction
3. Broadcasting with single syscall
4. No mutex acquisition after wait on semaphore
5. Portable because based on semaphore
6. No spurious wakeups by design
7. You can easily transform this algorithm into condition variable

Brief comments on algorithm:
1. Every thread has associated node which it uses in eventcount
operations
2. Threads exchange of their nodes
3. Mutex acquisition, which must be after wait on semaphore, executed
before next wait (it's just deferred).

Here is the code:

struct thread_node
{
thread_node* next;
size_t count;
size_t unconsumed;
HANDLE sema;
CRITICAL_SECTION mtx;
};

__declspec(thread) thread_node* t_thread_node;

void on_thread_exit()
{
thread_node* head = t_thread_node;
thread_node* my = 0;
if (head)
{
EnterCriticalSection(&head->mtx);
if (head->next)
{
my = head->next;
head->next = my->next;
}
else
{
my = head;
}
LeaveCriticalSection(&head->mtx);
DeleteCriticalSection(&my->mtx);
CloseHandle(my->sema);
delete my;
}
}

struct eventcount
{
eventcount()
{
root = 0;
InitializeCriticalSection(&mtx);
}

~eventcount()
{
DeleteCriticalSection(&mtx);
}

void prepare_wait()
{
thread_node* my = 0;
thread_node* head = t_thread_node;
if (head)
{
EnterCriticalSection(&head->mtx);
if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;
}
LeaveCriticalSection(&head->mtx);
}
else
{
my = new thread_node;
my->next = 0;
my->count = 0;
my->unconsumed = 0;
my->sema = CreateSemaphoreW(0, 0, LONG_MAX, 0);
InitializeCriticalSection(&my->mtx);
}

while (my->unconsumed)
{
WaitForSingleObject(my->sema, 0);
my->unconsumed -= 1;
}

EnterCriticalSection(&mtx);
if (root)
{
my->next = root->next;
root->next = my;
my = root;
}
else
{
root = my;
}
root->count += 1;
LeaveCriticalSection(&mtx);
t_thread_node = my;
}

void wait()
{
thread_node* head = t_thread_node;
if (head == root)
{
WaitForSingleObject(head->sema, INFINITE);
}
else
{
EnterCriticalSection(&head->mtx);
head->unconsumed += 1;
LeaveCriticalSection(&head->mtx);
}
}

void retire_wait()
{
thread_node* head = t_thread_node;
if (head == root)
{
EnterCriticalSection(&mtx);
if (head == root)
{
thread_node* my = 0;
head->count -= 1;
if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;
}
LeaveCriticalSection(&mtx);
t_thread_node = my;
return;
}
LeaveCriticalSection(&mtx);
}
EnterCriticalSection(&head->mtx);
head->unconsumed += 1;
LeaveCriticalSection(&head->mtx);
}

void signal_all()
{
_mm_mfence();
thread_node* head = root;
if (0 == head)
return;
EnterCriticalSection(&mtx);
if (head != root)
{
LeaveCriticalSection(&mtx);
return;
}
root = 0;
LeaveCriticalSection(&mtx);
size_t count = head->count;
head->count = 0;
ReleaseSemaphore(head->sema, count, 0);
}

void signal_one()
{
_mm_mfence();
thread_node* head = root;
if (0 == head)
return;
EnterCriticalSection(&mtx);
if (head != root)
{
LeaveCriticalSection(&mtx);
return;
}
head->count -= 1;
if (0 == head->count)
root = 0;
LeaveCriticalSection(&mtx);
ReleaseSemaphore(head->sema, 1, 0);
}

thread_node* volatile root;
CRITICAL_SECTION mtx;
};


Dmitriy V'jukov
--
Relacy Race Detector: Make your synchronization correct!
http://groups.google.ru/group/relacy/web

Chris M. Thomasson

unread,
Sep 18, 2008, 2:04:23 AM9/18/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:aa9e48f2-c451-4252...@c58g2000hsc.googlegroups.com...

>I take into account feedback from Anthony Williams and Chris Thomasson
> after my first try:
> http://groups.google.com/group/comp.programming.threads/browse_frm/thread/9461b41709e4063a
[...]

I wish I had time to properly study the algorithm!

;^(


However, I have one question:


> void wait()
> {
> thread_node* head = t_thread_node;
> if (head == root)
> {

/* what happens if `head' != `root' right here? Can this race
occur? */

> WaitForSingleObject(head->sema, INFINITE);
> }
> else
> {
> EnterCriticalSection(&head->mtx);
> head->unconsumed += 1;
> LeaveCriticalSection(&head->mtx);
> }
> }

[...]


Chris M. Thomasson

unread,
Sep 18, 2008, 3:46:10 AM9/18/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:aa9e48f2-c451-4252...@c58g2000hsc.googlegroups.com...
>I take into account feedback from Anthony Williams and Chris Thomasson
> after my first try:
> http://groups.google.com/group/comp.programming.threads/browse_frm/thread/9461b41709e4063a

Think about adding support for timeouts...

[...]

Dmitriy V'jukov

unread,
Sep 18, 2008, 3:51:36 AM9/18/08
to
On 18 сент, 10:04, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> However, I have one question:
>
> >    void wait()
> >    {
> >        thread_node* head = t_thread_node;
> >        if (head == root)
> >        {
>
>              /* what happens if `head' != `root' right here? Can this race
> occur? */


Yes, this race can occur, but it will not break correctness. This will
just mean that thread will fairly wait for semaphore, like all other
threads which came earlier.


Dmitriy V'jukov

Dmitriy V'jukov

unread,
Sep 18, 2008, 4:21:06 AM9/18/08
to
On 18 сент, 11:46, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> "Dmitriy V'jukov" <dvyu...@gmail.com> wrote in message

>
> news:aa9e48f2-c451-4252...@c58g2000hsc.googlegroups.com...
>
> >I take into account feedback from Anthony Williams and Chris Thomasson
> > after my first try:
> >http://groups.google.com/group/comp.programming.threads/browse_frm/th...

>
> Think about adding support for timeouts...


On Windows it will be straightforward. One just need to specify
timeout to WaitForSingleObject(), and if timeout happens, execute
retire_wait().
On Linux... It seems that in this version it's impossible. In Linux
it's possible to specify timeout only for pthread_cond_timedwait()...


Dmitriy V'jukov

Dmitriy V'jukov

unread,
Sep 18, 2008, 8:19:22 AM9/18/08
to
On Sep 18, 10:04 am, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> > I take into account feedback from Anthony Williams and Chris Thomasson
> > after my first try:
> >http://groups.google.com/group/comp.programming.threads/browse_frm/th...

> I wish I had time to properly study the algorithm!


Brief description:
Each thread has associated 'node', which contains semaphore and 'next'
link. When a bunch of threads (generation) block on eventcount, first
thread's node becomes a 'head' of a stack, and all others' nodes
enqueued into that stack. All threads from generation wait on single
semaphore, which is situated in the 'head'. So signaler have to signal
only one semaphore. When thread wakes up from wait, it dequeues
*arbitrary* node from stack, and last thread from generation grabs
'head' node. But this dequeuing occurs not directly after wait, but
*before next wait*. So dequeuing is spread over time, so to say. This
way I want to overcome problem described here:
http://groups.google.com/group/comp.programming.threads/msg/4b9abe264dfcc05d?hl=en


Dmitriy V'jukov

Chris M. Thomasson

unread,
Sep 18, 2008, 8:43:29 AM9/18/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:031ea0ff-4d20-4be2...@l42g2000hsc.googlegroups.com...

What about:

http://www.opengroup.org/onlinepubs/009695399/functions/sem_timedwait.html

? That should do the trick...

Dmitriy V'jukov

unread,
Sep 18, 2008, 9:19:28 AM9/18/08
to
On Sep 18, 4:43 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> "Dmitriy V'jukov" <dvyu...@gmail.com> wrote in message
>
> news:031ea0ff-4d20-4be2...@l42g2000hsc.googlegroups.com...

> On 18 ÓÅÎÔ, 11:46, "Chris M. Thomasson" <n...@spam.invalid> wrote:
>
> > > "Dmitriy V'jukov" <dvyu...@gmail.com> wrote in message
>
> > >news:aa9e48f2-c451-4252...@c58g2000hsc.googlegroups.com...
>
> > > >I take into account feedback from Anthony Williams and Chris Thomasson
> > > > after my first try:
> > > >http://groups.google.com/group/comp.programming.threads/browse_frm/th...
>
> > > Think about adding support for timeouts...
> > On Windows it will be straightforward. One just need to specify
> > timeout to WaitForSingleObject(), and if timeout happens, execute
> > retire_wait().
> > On Linux... It seems that in this version it's impossible. In Linux
> > it's possible to specify timeout only for pthread_cond_timedwait()...
>
> What about:
>
> http://www.opengroup.org/onlinepubs/009695399/functions/sem_timedwait...

>
> ? That should do the trick...

Oh, yes. Then no problem implementing timed_wait. I am just checking
against slightly different (outdated?) version of SUS XSH.

Dmitriy V'jukov

Chris M. Thomasson

unread,
Sep 18, 2008, 9:50:02 AM9/18/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:977fcd9c-09a9-4403...@y21g2000hsf.googlegroups.com...

Have you plugged this algorithm into Relacy yet?

;^D

Anyway, about the only caveat I can see is that I think it's bound to
SCHED_OTHER because of the manual waitset construction. Although, I don't
think its that big of a problem on Windows. Well, it would mess around with
the way the scheduler wakes up threads with different priorities; it could
be a big problem on real-time systems... Humm...

Dmitriy V'jukov

unread,
Sep 18, 2008, 9:53:58 AM9/18/08
to
On Sep 18, 5:50 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> "Dmitriy V'jukov" <dvyu...@gmail.com> wrote in message

>
> news:977fcd9c-09a9-4403...@y21g2000hsf.googlegroups.com...
> On Sep 18, 10:04 am, "Chris M. Thomasson" <n...@spam.invalid> wrote:
>
>
>
> > > > I take into account feedback from Anthony Williams and Chris Thomasson
> > > > after my first try:
> > > >http://groups.google.com/group/comp.programming.threads/browse_frm/th...
> > > I wish I had time to properly study the algorithm!
> > Brief description:
> > Each thread has associated 'node', which contains semaphore and 'next'
> > link. When a bunch of threads (generation) block on eventcount, first
> > thread's node becomes a 'head' of a stack, and all others' nodes
> > enqueued into that stack. All threads from generation wait on single
> > semaphore, which is situated in the 'head'. So signaler have to signal
> > only one semaphore. When thread wakes up from wait, it dequeues
> > *arbitrary* node from stack, and last thread from generation grabs
> > 'head' node. But this dequeuing occurs not directly after wait, but
> > *before next wait*. So dequeuing is spread over time, so to say. This
> > way I want to overcome problem described here:
> >http://groups.google.com/group/comp.programming.threads/msg/4b9abe264...

>
> Have you plugged this algorithm into Relacy yet?


Not yet. Because I am not going to use it in production right now. But
before usage in production I will strongly consider verification with
Relacy.


> Anyway, about the only caveat I can see is that I think it's bound to
> SCHED_OTHER because of the manual waitset construction. Although, I don't
> think its that big of a problem on Windows. Well, it would mess around with
> the way the scheduler wakes up threads with different priorities; it could
> be a big problem on real-time systems... Humm...


I don't understand the problem. I have 1 semaphore per generation, so
*OS* decides what thread to wake up, not me. I think that OS will give
priority to thread with higher priority.


Dmitriy V'jukov

Chris M. Thomasson

unread,
Sep 18, 2008, 10:12:04 AM9/18/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:a581cc57-d629-4ff3...@34g2000hsh.googlegroups.com...

false-alarm!

;^/

Chris M. Thomasson

unread,
Sep 18, 2008, 10:19:36 AM9/18/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:aa9e48f2-c451-4252...@c58g2000hsc.googlegroups.com...

>I take into account feedback from Anthony Williams and Chris Thomasson
> after my first try:
> http://groups.google.com/group/comp.programming.threads/browse_frm/thread/9461b41709e4063a
>
> Here is claimed properties:
> 1. No memory allocation/deallocation
> 2. No kernel object creation/destruction
> 3. Broadcasting with single syscall
> 4. No mutex acquisition after wait on semaphore
> 5. Portable because based on semaphore
> 6. No spurious wakeups by design
> 7. You can easily transform this algorithm into condition variable
>
> Brief comments on algorithm:
> 1. Every thread has associated node which it uses in eventcount
> operations
> 2. Threads exchange of their nodes
> 3. Mutex acquisition, which must be after wait on semaphore, executed
> before next wait (it's just deferred).
>
> Here is the code:
[...]

What does the consumer usage pattern look like? The interface suggests that
is differs from my original eventcount algorithm here:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/aa8c62ad06dbb380

while (! try_to_do_something()) {
int const key = ec.get();
if (try_to_do_something()) { break; }
ec.wait(key);
}


Does your version's consumer usage pattern look like this:

while (! try_to_do_something()) {
eventcount.prepare_wait();
if (try_to_do_something()) {
eventcount.retire_wait();
break;
}
eventcount.wait();
}


Is that correct?

Dmitriy V'jukov

unread,
Sep 18, 2008, 10:31:39 AM9/18/08
to
On Sep 18, 6:19 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> "Dmitriy V'jukov" <dvyu...@gmail.com> wrote in message

>
> news:aa9e48f2-c451-4252...@c58g2000hsc.googlegroups.com...
>
> >I take into account feedback from Anthony Williams and Chris Thomasson
> > after my first try:
> >http://groups.google.com/group/comp.programming.threads/browse_frm/th...

>
> > Here is claimed properties:
> > 1. No memory allocation/deallocation
> > 2. No kernel object creation/destruction
> > 3. Broadcasting with single syscall
> > 4. No mutex acquisition after wait on semaphore
> > 5. Portable because based on semaphore
> > 6. No spurious wakeups by design
> > 7. You can easily transform this algorithm into condition variable
>
> > Brief comments on algorithm:
> > 1. Every thread has associated node which it uses in eventcount
> > operations
> > 2. Threads exchange of their nodes
> > 3. Mutex acquisition, which must be after wait on semaphore, executed
> > before next wait (it's just deferred).
>
> > Here is the code:
>
> [...]
>
> What does the consumer usage pattern look like? The interface suggests that
> is differs from my original eventcount algorithm here:
>
> http://groups.google.com/group/comp.programming.threads/browse_frm/th...

>
> while (! try_to_do_something()) {
>   int const key = ec.get();
>   if (try_to_do_something()) { break; }
>   ec.wait(key);
>
> }
>
> Does your version's consumer usage pattern look like this:
>
> while (! try_to_do_something()) {
>   eventcount.prepare_wait();
>   if (try_to_do_something()) {
>     eventcount.retire_wait();
>     break;
>   }
>   eventcount.wait();
>
> }
>
> Is that correct?


Yes. Here is usage example that I made for Portable Eventcount (try
1). It effectively the same, except that prepare_wait() doesn't return
value any more, return value stored in TLS in this version (try 2).


************************************************************

Here is a little helper class for blocking threads and usage example:

class eventcount_blocking
{
public:
eventcount_blocking(eventcount& ec)
: ec_(ec)
{
cmp_ = ec_.prepare_wait();
wait_ = false;
}

void wait()
{
assert(false == wait_);
wait_ = true;
ec_.wait(cmp_);
}

~eventcount_blocking()
{
if (false == wait_)
ec_.retire_wait();
}

private:
eventcount& ec_;
eventcount::state_t cmp_;
bool wait_;

eventcount_blocking(eventcount_blocking const&);
eventcount_blocking& operator = (eventcount_blocking const&);
};

************************************************************

Usage example:

queue q;
eventcount ec;

void producer(void* data)
{
q.enqueue(data);
ec.signal(); // or signal_relaxed()
}

void* consumer()
{
void* data = 0;
if (data = q.dequeue())
return data;
for (;;)
{
eventcount_blocking block (ec);
if (data = q.dequeue())
return data;
block.wait();
if (data = q.dequeue())
return data;
}
}


Dmitriy V'jukov

Dmitriy V'jukov

unread,
Sep 19, 2008, 3:27:59 PM9/19/08
to
I've found a very old post by Joe Seigh about Eventcounts:

On Dec 9 2004, 7:49 pm, Joe Seigh <jseigh...@xemaps.com> wrote:
> You only do a futex_wake if you see waiters, so basically adding blocking to
> a lock-free queue is free. But if you expect any significant amount of
> blocking (anything more than infrequent) then you're better off using
> a mutex+condvar. Eventcounts are susceptable to thundering herd since they're
> a little too non-blocking. Locks seem to do better under those conditions
> since they're sort of a semaphore which limits the number of runable threads
> somewhat. That's not to say locks are better.
> Joe Seigh

I think that this eventcount algorithm is not susceptible to
thundering herd, provided that producer calls signal_one() (not
signal_all()).

Dmitriy V'jukov

Dmitriy V'jukov

unread,
Sep 21, 2008, 5:08:03 PM9/21/08
to
On 18 сент, 00:30, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:


I've applied Relacy to this algorithm and it reveals some problems (as
usual).


> struct thread_node
> {
>     thread_node*            next;
>     size_t                  count;
>     size_t                  unconsumed;
>     HANDLE                  sema;
>     CRITICAL_SECTION        mtx;
> };
>
> __declspec(thread) thread_node* t_thread_node;
>
> void on_thread_exit()
> {
>     thread_node* head = t_thread_node;
>     thread_node* my = 0;
>     if (head)
>     {
>         EnterCriticalSection(&head->mtx);
>         if (head->next)
>         {
>             my = head->next;
>             head->next = my->next;
>         }
>         else
>         {
>             my = head;
>         }
>         LeaveCriticalSection(&head->mtx);

Here we have to wait for possible pending signals:

while (my->unconsumed)
{
WaitForSingleObject(my->sema, INFINITE);
my->unconsumed -= 1;
}

/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\

Here is very small possibility that signaler is not yet signaled, so
we must use INFINITY instead of 0.

Here we have to reset 'root':

root = 0;

>                 }
>                 LeaveCriticalSection(&mtx);
>                 t_thread_node = my;
>                 return;
>             }
>             LeaveCriticalSection(&mtx);
>         }
>         EnterCriticalSection(&head->mtx);
>         head->unconsumed += 1;
>         LeaveCriticalSection(&head->mtx);
>     }
>
>     void signal_all()
>     {
>         _mm_mfence();
>         thread_node* head = root;
>         if (0 == head)
>             return;
>         EnterCriticalSection(&mtx);
>         if (head != root)
>         {
>             LeaveCriticalSection(&mtx);
>             return;
>         }
>         root = 0;
>         LeaveCriticalSection(&mtx);
>         size_t count = head->count;
>         head->count = 0;

/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\

'root = 0' is a kind of signal, after which threads will use 'head-
>mtx' instead of eventcount's mtx. So this must be rewritten as:

size_t count = head->count;
head->count = 0;

root = 0;
LeaveCriticalSection(&mtx);


>         ReleaseSemaphore(head->sema, count, 0);
>     }
>
>     void signal_one()
>     {

signal_one() is completely busted. Because root is not changed, so new
waiters will use eventcount's mtx to push their nodes to stack rooted
at 'root' variable; and signaled thread will use 'head->mtx' to pop
node from stack.

Dmitriy V'jukov

unread,
Sep 23, 2008, 7:40:07 AM9/23/08
to
On Sep 22, 1:08 am, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:

Here is fixed version.
Signal_one() is also repaired. When thread pops node from stack, it
checks whether stack's head is still equal to eventcount's root. If so
thread uses eventcount's mtx, instead of 'head->mtx'.

struct thread_node
{
thread_node* next;
size_t count;
size_t unconsumed;
HANDLE sema;
CRITICAL_SECTION mtx;
};

__declspec(thread) thread_node* t_thread_node;

void on_thread_exit()
{
thread_node* head = t_thread_node;
thread_node* my = 0;
if (head)
{

if (head == root)
{
EnterCriticalSection(&mtx);
if (head == root)
{

my = head->next;
head->next = my->next;
my->next = 0;
}

LeaveCriticalSection(&mtx);
}
if (0 == my)


{
EnterCriticalSection(&head->mtx);
if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;
}
LeaveCriticalSection(&head->mtx);
}
while (my->unconsumed)
{
WaitForSingleObject(my->sema, INFINITE);
my->unconsumed -= 1;
}
DeleteCriticalSection(&my->mtx);
CloseHandle(my->sema);
delete my;
}
}

struct eventcount
{
eventcount()
{
root = 0;
InitializeCriticalSection(&mtx);
}

~eventcount()
{
DeleteCriticalSection(&mtx);
}

void prepare_wait()
{
thread_node* my = 0;
thread_node* head = t_thread_node;
if (head)
{

if (head == root)
{
EnterCriticalSection(&mtx);
if (head == root)
{

my = head->next;
head->next = my->next;
my->next = 0;
}

LeaveCriticalSection(&mtx);
}
if (0 == my)


{
EnterCriticalSection(&head->mtx);
if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;
}
LeaveCriticalSection(&head->mtx);
}
}
else
{
my = new thread_node;
my->next = 0;
my->count = 0;
my->unconsumed = 0;
my->sema = CreateSemaphoreW(0, 0, LONG_MAX, 0);
InitializeCriticalSection(&my->mtx);
}

while (my->unconsumed)
{


WaitForSingleObject(my->sema, INFINITE);
my->unconsumed -= 1;
}

EnterCriticalSection(&mtx);

root = 0;
}
LeaveCriticalSection(&mtx);
t_thread_node = my;
return;
}
LeaveCriticalSection(&mtx);
}
EnterCriticalSection(&head->mtx);
head->unconsumed += 1;
LeaveCriticalSection(&head->mtx);
}

void signal_all()
{
_mm_mfence();
thread_node* head = root;
if (0 == head)
return;
EnterCriticalSection(&mtx);
if (head != root)
{
LeaveCriticalSection(&mtx);
return;
}

size_t count = head->count;
head->count = 0;
root = 0;
LeaveCriticalSection(&mtx);
ReleaseSemaphore(head->sema, count, 0);
}

void signal_one()
{

Chris M. Thomasson

unread,
Oct 26, 2008, 9:00:50 AM10/26/08
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:1cd2f60e-d869-4ba0...@59g2000hsb.googlegroups.com...

> __declspec(thread) thread_node* t_thread_node;

[...]

Humm... __As-is__, doesn't the function above need to reside within the
eventcount object itself? I mean, this "free function" is attempting to make
use of the `eventcount::root/mtx' variables which its does not have access
to. I guess you can put a pointer to the eventcount in the `thread_node'
data-structure, but then you would need to explicitly manage the lifetime of
each eventcount object...

what am I missing here Dmitriy?

Dmitriy V'jukov

unread,
Oct 30, 2008, 2:46:33 PM10/30/08
to
On Oct 26, 4:00 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> Humm... __As-is__, doesn't the function above need to reside within the
> eventcount object itself? I mean, this "free function" is attempting to make
> use of the `eventcount::root/mtx' variables which its does not have access
> to. I guess you can put a pointer to the eventcount in the `thread_node'
> data-structure, but then you would need to explicitly manage the lifetime of
> each eventcount object...

Good question! :)
Algorithm definitely must be finished off. I have to think on this.

Dmitriy V'jukov

Dmitriy V'jukov

unread,
Oct 31, 2008, 10:29:11 AM10/31/08
to

And above algorithm assumes existence of only one eventcount.

Here is a sketch of improved algorithm. The main improvement is that
it compiles :) Then it at least assumes existence of plurality of
eventcounts.
Yes, I've put a link to eventcount into thread node. But I don't
manage lifetime of objects with things like reference-counting nor
PDR. I've used mutexes to manage lifetime. The key is that link from
thread node to eventcount is resetted in signal_all(), so I can safely
access eventcount's mutex in on_thread_exit().

struct eventcount;

struct thread_node
{
thread_node* next;
size_t count;
size_t unconsumed;

eventcount* ec; // back-link to last eventcount
HANDLE sema;
CRITICAL_SECTION mtx;
};

__declspec(thread) thread_node* t_thread_node;

// helper function
thread_node* privatize_thread_node(bool create);

void on_thread_exit();

struct eventcount
{
eventcount()
{
root = 0;
InitializeCriticalSection(&mtx);
}

~eventcount()
{
DeleteCriticalSection(&mtx);
}

void prepare_wait()
{
thread_node* my = privatize_thread_node(true);


EnterCriticalSection(&mtx);
if (root)
{
my->next = root->next;
root->next = my;
my = root;
}
else
{

my->ec = this;


root = my;
}
root->count += 1;
LeaveCriticalSection(&mtx);
t_thread_node = my;
}

void wait()
{
thread_node* head = t_thread_node;
if (head == root)
{
WaitForSingleObject(head->sema, INFINITE);
}
else
{
EnterCriticalSection(&head->mtx);
head->unconsumed += 1;
LeaveCriticalSection(&head->mtx);
}
}

void retire_wait()
{
thread_node* head = t_thread_node;

if (head == root)
{
EnterCriticalSection(&mtx);
if (head == root)
{

thread_node* my = 0;
head->count -= 1;

if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;

root = 0;
}
LeaveCriticalSection(&mtx);
t_thread_node = my;
return;
}
LeaveCriticalSection(&mtx);
}
EnterCriticalSection(&head->mtx);
head->unconsumed += 1;
LeaveCriticalSection(&head->mtx);
}

void signal_all()
{
_mm_mfence();
thread_node* head = root;
if (0 == head)
return;
EnterCriticalSection(&mtx);
if (head != root)
{
LeaveCriticalSection(&mtx);
return;
}
size_t count = head->count;
head->count = 0;
root = 0;
LeaveCriticalSection(&mtx);

EnterCriticalSection(&head->mtx);
head->ec = 0;
LeaveCriticalSection(&head->mtx);
ReleaseSemaphore(head->sema, count, 0);
}

void signal_one()
{
_mm_mfence();
thread_node* head = root;
if (0 == head)
return;
EnterCriticalSection(&mtx);
if (head != root)
{
LeaveCriticalSection(&mtx);
return;
}
head->count -= 1;
if (0 == head->count)

root = 0;
LeaveCriticalSection(&mtx);
if (0 == head->count)
{
EnterCriticalSection(&head->mtx);
head->ec = 0;
LeaveCriticalSection(&head->mtx);
}
ReleaseSemaphore(head->sema, 1, 0);
}

thread_node* volatile root;
CRITICAL_SECTION mtx;
};


thread_node* privatize_thread_node(bool create)


{
thread_node* my = 0;
thread_node* head = t_thread_node;
if (head)
{

EnterCriticalSection(&head->mtx);
if (head->ec)
{
EnterCriticalSection(&head->ec->mtx);
if (head->ec->root == head)
{


if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;

my->count = 0;
head->ec->root = 0;
}
}
EnterCriticalSection(&head->ec->mtx);
}
if (my == 0)
{


if (head->next)
{
my = head->next;
head->next = my->next;
my->next = 0;
}
else
{
my = head;

my->count = 0;
}
}
LeaveCriticalSection(&head->mtx);
}
else if (create)


{
my = new thread_node;
my->next = 0;
my->count = 0;
my->unconsumed = 0;

my->ec = 0;


my->sema = CreateSemaphoreW(0, 0, LONG_MAX, 0);
InitializeCriticalSection(&my->mtx);
}

else
{
return 0;
}

while (my->unconsumed)
{
WaitForSingleObject(my->sema, INFINITE);
my->unconsumed -= 1;
}

return my;
}

void on_thread_exit()
{
thread_node* my = privatize_thread_node(false);
if (my)
{


DeleteCriticalSection(&my->mtx);
CloseHandle(my->sema);
delete my;
}
}

Dmitriy V'jukov

Chris M. Thomasson

unread,
Oct 31, 2008, 3:43:21 PM10/31/08
to

"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:ffc7aff0-36dc-4bae...@f40g2000pri.googlegroups.com...

On Oct 30, 9:46 pm, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
> > On Oct 26, 4:00 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> >
> > > Humm... __As-is__, doesn't the function above need to reside within
> > > the
> > > eventcount object itself? I mean, this "free function" is attempting
> > > to make
> > > use of the `eventcount::root/mtx' variables which its does not have
> > > access
> > > to. I guess you can put a pointer to the eventcount in the
> > > `thread_node'
> > > data-structure, but then you would need to explicitly manage the
> > > lifetime of
> > > each eventcount object...
> >
> > Good question! :)
> > Algorithm definitely must be finished off. I have to think on this.

> And above algorithm assumes existence of only one eventcount.

> Here is a sketch of improved algorithm. The main improvement is that
> it compiles :) Then it at least assumes existence of plurality of
> eventcounts.
> Yes, I've put a link to eventcount into thread node. But I don't
> manage lifetime of objects with things like reference-counting nor
> PDR. I've used mutexes to manage lifetime. The key is that link from
> thread node to eventcount is resetted in signal_all(), so I can safely
> access eventcount's mutex in on_thread_exit().

[...]

Here is a fully compliable and crude test application in which I used the
eventcount (with one small change: CreateSemaphoreW ==> CreateSemaphore) to
create a condvar:
______________________________________________________________________
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <climits>
#include <intrin.h>


struct eventcount;

__declspec(thread) thread_node* t_thread_node;

void on_thread_exit();

~eventcount()
{
DeleteCriticalSection(&mtx);
}

my->sema = CreateSemaphore(0, 0, LONG_MAX, 0);


InitializeCriticalSection(&my->mtx);
}
else
{
return 0;
}

while (my->unconsumed)
{
WaitForSingleObject(my->sema, INFINITE);
my->unconsumed -= 1;
}

return my;
}

void on_thread_exit()
{
thread_node* my = privatize_thread_node(false);
if (my)
{
DeleteCriticalSection(&my->mtx);
CloseHandle(my->sema);
delete my;
}
}

/* Test Application
_____________________________________________________________*/
#include <process.h>
#include <cstdio>


class mutex {
CRITICAL_SECTION m_mutex;

public:
mutex() {
InitializeCriticalSection(&m_mutex);
}

~mutex() {
DeleteCriticalSection(&m_mutex);
}

void lock() {
EnterCriticalSection(&m_mutex);
}

void unlock() {
LeaveCriticalSection(&m_mutex);
}
};


class cond {
eventcount m_waitset;

public:
void wait(mutex& umtx) {
m_waitset.prepare_wait();
umtx.unlock();
m_waitset.wait();
umtx.lock();
}

void signal() {
m_waitset.signal_one();
}

void broadcast() {
m_waitset.signal_all();
}
};


static mutex g_mtx;
static cond g_cond;
static unsigned g_stage;


unsigned WINAPI thread_entry(void* state) {
unsigned const id = reinterpret_cast<unsigned>(state);
g_mtx.lock();
switch (id) {
case 0:
++g_stage;
g_cond.broadcast();
while (g_stage != 5) {
g_cond.wait(g_mtx);
}
break;

case 1:
while (g_stage != 1) {
g_cond.wait(g_mtx);
}
++g_stage;
g_cond.broadcast();
while (g_stage != 5) {
g_cond.wait(g_mtx);
}
break;

case 2:
while (g_stage != 2) {
g_cond.wait(g_mtx);
}
++g_stage;
g_cond.broadcast();
while (g_stage != 5) {
g_cond.wait(g_mtx);
}
break;

case 3:
while (g_stage != 3) {
g_cond.wait(g_mtx);
}
++g_stage;
g_cond.broadcast();
while (g_stage != 5) {
g_cond.wait(g_mtx);
}
break;

case 4:
while (g_stage != 4) {
g_cond.wait(g_mtx);
}
++g_stage;
g_cond.broadcast();
break;

case 5:
while (g_stage != 5) {
g_cond.wait(g_mtx);
}
break;
}
g_mtx.unlock();
on_thread_exit();
return 0;
}


#define RUNS 100000

int main() {
for (unsigned r = 0; r < RUNS; ++r) {
unsigned tid_tmp, i;
HANDLE tid[6];
g_stage = 0;
std::printf("RUN %u -of- %u EXECUTING...\n", r + 1, RUNS);
for (i = 0; i < 6; ++i) {
tid[i] = (HANDLE)_beginthreadex(NULL, 0, thread_entry,
(void*)i, 0, &tid_tmp);
}
for (i = 0; i < 6; ++i) {
WaitForSingleObject(tid[i], INFINITE);
CloseHandle(tid[i]);
}
std::printf("RUN %u -of- %u COMPLETED!\n\n", r + 1, RUNS);
}

std::fflush(stdout);
std::puts("\n\n\n___________________\nhit <ENTER> to exit...");
std::getchar();
return 0;
}
______________________________________________________________________


So far so good. Well, it seems like this is going to work out... Although,
this little test only uses `eventcount::signal_all' procedure; I need to
test `eventcount::signal_one'...

Dmitriy V'jukov

unread,
Oct 31, 2008, 5:31:18 PM10/31/08
to
On Oct 31, 10:43 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> So far so good. Well, it seems like this is going to work out...

Great!

> Although,
> this little test only uses `eventcount::signal_all' procedure; I need to
> test `eventcount::signal_one'...

signal_one() must be tested especially, because it creates subtle
situation when signaled thread already wakeup, but thread descriptor
list is still the root in the eventcount.
Actually signal_one() is the only reason to keep back-link to
eventcount in thread descriptor. If there would be only signal_all()
then back-link to eventcount will not be needed.

Dmitriy V'jukov

Reply all
Reply to author
Forward
0 new messages