Of course, I am interested in feedbacks.
condvar constructor:
{
waiters : int(0)
task : int(0)
turn1 : semaphore(1) // one is open (original sem value = 1)
turn2 : semaphore(0) // another is closed
m: mutex
}
wait(mutex CS)
{
turn1.get();
m.lock();
++waiters;
m.unlock();
turn1.put();
CS.unlock();
turn2.get();
m.lock();
bool last = (--waiters == task);
m.unlock();
if(last)
turn1.put();
else
turn2.put();
CS.lock();
}
signal(bool all)
{
turn1.get();
m.lock();
if(waiters)
{
if(all)
task = 0;
else
task = waiters-1;
m.unlock();
turn2.put();
}
else
{
m.unlock();
turn1.put();
}
}
Thanks, Michael
I need to take a look at it Michael; I will probably implement it in Relacy
Race Detector when I get some time. FWIW, here is a simple and correct
condition variable in Windows, it's broadcast only, but that is definitely
allowed for in the POSIX standard:
<pseudo-code sketch>
___________________________________________________________
struct condvar
{
struct waitset
{
HANDLE m_event; // = manual reset event; set to false
LONG m_refs; // = 0
};
waitset* m_waitset; // = NULL
LONG m_refs; // = 0
CRITICAL_SECTION m_mutex;
bool timed_wait(LPCRITICAL_SECTION umutex, DWORD tout)
{
// grab a reference to the current waitset
EnterCriticalSection(&m_mutex);
waitset* w = m_waitset;
if (! w) w = m_waitset = new waitset();
++m_refs;
LeaveCriticalSection(&m_mutex);
// unlock user mutex and wait on the waitset we obtained
LeaveCriticalSection(umutex);
DWORD status = WaitForSingleObject(w->m_waitset, tout);
// decrement our waitsets refcount
if (! InterlockedDecrement(&w->m_refs))
{
delete w;
}
EnterCriticalSection(umutex);
// that's all folks! ;^)
return status == WAIT_OBJECT_0 ? true : false;
}
void wait(LPCRITICAL_SECTION umutex)
{
timed_wait(umutex, INFINITE);
}
void broadcast()
{
// swap a reference to the current waitset with NULL
EnterCriticalSection(&m_mutex);
waitset* w = m_waitset;
LONG refs = m_refs;
m_waitset = NULL;
m_refs = 0;
LeaveCriticalSection(&m_mutex);
if (w)
{
// signal all waiters
SetEvent(w->m_event);
// transfer the swapped reference count to the
// waitset reference count
if (InterlockedExchangeAdd(&w->m_refs, refs) == -refs)
{
delete w;
}
}
}
void signal()
{
broadcast();
}
};
_____________________________________________________________
I need to take a look at it Michael; I will probably implement it in Relacy
I must be missing something important here, but does the logic above
actually broadcast? I mean, you only seem to be incrementing the semaphore
one single time... How does that wake multiple waiters? I am confused here!
;^(...
Ahh. It seems like you "continue" to increment the semaphore in the
following waiting logic:
___________________________________
turn2.get();
m.lock();
bool last = (--waiters == task);
m.unlock();
if(last)
turn1.put();
else
turn2.put();
___________________________________
Okay. I still need to impl this in Relacy, but I think I see what you are
getting at here Michael.
> void broadcast()
> {
> // swap a reference to the current waitset with NULL
> EnterCriticalSection(&m_mutex);
> waitset* w = m_waitset;
> LONG refs = m_refs;
> m_waitset = NULL;
> m_refs = 0;
> LeaveCriticalSection(&m_mutex);
>
> if (w)
> {
> // signal all waiters
> SetEvent(w->m_event);
>
> // transfer the swapped reference count to the
> // waitset reference count
> if (InterlockedExchangeAdd(&w->m_refs, refs) == -refs)
> {
> delete w;
> }
> }
> }
BTW, as for your implementation, I could not figure out why you
collect the waiters number in condvar::m_refs and not directly in
waitset::m_refs. Then there comes a trick of transferring
condvar::m_refs to waitset::m_refs. Well, it should work, but I could
not understand why you needed to maintain two separate counters at all
and not worked directly on waitset::m_refs. Could you please clarify?
Thanks, Michael
> > void broadcast()
> > {
[...]
> > }
> BTW, as for your implementation, I could not figure out why you
> collect the waiters number in condvar::m_refs and not directly in
> waitset::m_refs. Then there comes a trick of transferring
> condvar::m_refs to waitset::m_refs. Well, it should work, but I could
> not understand why you needed to maintain two separate counters at all
> and not worked directly on waitset::m_refs. Could you please clarify?
I did it to avoid locking the `condvar::m_mutex' after a waiter wakes.
Although, I think I might be able to use a single reference count and still
avoid locking the `condvar::m_mutex'. Humm... Perhaps something like:
<pseudo-code sketch>
___________________________________________________________
struct condvar
{
struct waitset
{
HANDLE m_event; // = manual reset event; set to false
LONG m_refs; // = 1
};
waitset* m_waitset; // = NULL
CRITICAL_SECTION m_mutex;
bool timed_wait(LPCRITICAL_SECTION umutex, DWORD tout)
{
// grab a reference to the current waitset
EnterCriticalSection(&m_mutex);
waitset* w = m_waitset;
if (! w) w = m_waitset = new waitset();
InterlockedIncrement(&w->m_refs);
LeaveCriticalSection(&m_mutex);
// unlock user mutex and wait on the waitset we obtained
LeaveCriticalSection(umutex);
DWORD status = WaitForSingleObject(w->m_waitset, tout);
// decrement our waitsets refcount
if (! InterlockedDecrement(&w->m_refs))
{
delete w;
}
EnterCriticalSection(umutex);
// that's all folks! ;^)
return status == WAIT_OBJECT_0 ? true : false;
}
void wait(LPCRITICAL_SECTION umutex)
{
timed_wait(umutex, INFINITE);
}
void broadcast()
{
// swap a reference to the current waitset with NULL
EnterCriticalSection(&m_mutex);
waitset* w = m_waitset;
m_waitset = NULL;
LeaveCriticalSection(&m_mutex);
if (w)
{
// signal all waiters
SetEvent(w->m_event);
// decrement our waitsets refcount
if (! InterlockedDecrement(&w->m_refs))
{
delete w;
}
}
}
void signal()
{
broadcast();
}
};
_____________________________________________________________
What do you think?
I went ahead and implemented your algorithm in Relacy Race Detector 2.3:
http://cpt.pastebin.com/uhcAg5Km
http://groups.google.com/group/relacy
It's working great.
:^)
Looks good for me. Apart from decrementing refcount and deleting a
waitset in broadcast.
I would just wipe out the following part of your code:
> void broadcast()
> {
...
>
> // decrement our waitsets refcount
> if (! InterlockedDecrement(&w->m_refs))
> {
> delete w;
> }
...
> }
Thanks, Michael
Oh, it is just an 'academic' exercise, I do not mean to use it in a
real application. It has too many problems. I am working now on a,
hopefully, much better implementation (still with only a couple of
semaphores for it's still an 'academic' task for me).
BTW, what do you mean by "implemented it in Relacy"? You did not make
it a part of Relacy, did you? What then?
Thanks, Michael
What did you have in mind? I need to decrement the reference count here in
order to determine when a waitset can be safely destroyed/reused.
I probably should have written:
I implemented your condvar algorithm using the API's provided by Relacy.
Sorry for any confusion!
;^(
I meant in your last version of the code, you should decrement and
delete inside 'timed_wait' function only.
Otherwise you have N+1 InterlockedDecrements (N inside 'timed_wait and
one inside 'broadcast')
and may delete your waitset too soon, then access it again with last
decrement which leads to GPF or memory corruption.
Ain't I right?
Regards, Michael
(Late answer, sorry)
> > What did you have in mind? I need to decrement the reference count here
> > in
> > order to determine when a waitset can be safely destroyed/reused.
> I meant in your last version of the code, you should decrement and
> delete inside 'timed_wait' function only.
> Otherwise you have N+1 InterlockedDecrements (N inside 'timed_wait and
> one inside 'broadcast')
Take note of the following snippet:
_____________________________________________________
struct waitset
{
HANDLE m_event; // = manual reset event; set to false
LONG m_refs; // = 1
};
waitset* m_waitset; // = NULL
CRITICAL_SECTION m_mutex;
bool timed_wait(LPCRITICAL_SECTION umutex, DWORD tout)
{
// grab a reference to the current waitset
EnterCriticalSection(&m_mutex);
waitset* w = m_waitset;
if (! w) w = m_waitset = new waitset();
InterlockedIncrement(&w->m_refs);
LeaveCriticalSection(&m_mutex);
[...]
}
_____________________________________________________
Okay, a new waitset has always has a reference count of 1. When a waiter
_creates_ a new waitset, or references a current waitset, it always does an
atomic increment _within_ the critical section. Therefore if there are N
waiters, the reference count will always be 1 greater than N before a
broadcast occurs. Therefore, I think that the scenario you describe cannot
happen.
Let's say there are 3 waiters; the refcount will be 4. Now, 1 waiter times
out, thus rendering the refcount to 3. A broadcast occurs, thus rendering
the refcount to 2. Finally the last two waiters wake and perform decrements,
thus rendering the refcount to zero. This seems safe to me...
> and may delete your waitset too soon, then access it again with last
> decrement which leads to GPF or memory corruption.
> Ain't I right?
I am having trouble coming up with a scenario in which a waitset can be
destroyed too soon...
You are right.
Thanks, Michael