Make it LIFO.
>
> <pseudo-code without cancellation or timeout logic>
Nah, a Real Man's condvar has cancellation and timeout logic (that's
what makes it interesting, so to say). ;-)
regards,
alexander.
It's only Posix that has that cancelation logic. You don't necessarily need
or want Posix for windows.
Joe Seigh
I doubt it is possible to make lock-based threading without cancellation
logics for waits (the posix one being a good one). In fact, ThreadCancel()
in WINAPI is marked with a saying like "DON'T EVER THINK TO USE IT, EVEN IF
YOU KNOW WHAT YOU ARE DOING" (that is to say, "it had been a mistake, but
we understood it only 10 years after we made it").
Giancarlo.
Oops, I was thinking of condvar destroy. Alexander faked me out since he's usually
brings that up. Cancelation "exception" handling is a different can of worms.
Joe Seigh
:)
The timeout logic should be very straight forward, I'll start with that...
Something like this:
<pseudo-code without cancellation logic>
#define PER_THREAD_WAITING 1
typedef struct per_thread_
{
volatile LONG refs;
int state;
struct per_thread_ *next;
struct per_thread_ *prev;
HANDLE waiter;
} per_thread_t;
typedef struct condvar_
{
volatile LONG waiters;
per_thread_t *front;
per_thread_t *back;
LONG count;
CRITICAL_SECTION mutex;
} condvar_t;
per_thread_t* per_thread_self( void )
{
per_thread_t *_this = pthread_getspecific( ... );
if ( ! _this )
{
_this = malloc( sizeof( *_this ) );
if ( ! _this ) { abort(); }
_this->state = 0;
_this->next = 0;
_this->prev = 0;
_this->refs = 1;
_this->waiter = CreateEvent( 0, FALSE, FALSE, 0 );
if ( ! _this->waiter ) { free( _this ); abort(); }
pthread_setspecific( ..., _this );
}
assert( ! _this->state && ! _this->next && ! _this->prev );
return _this;
}
void per_thread_release( per_thread_t *_this )
{
if ( ! InterlockedDecrementRelease
( &_this->refs ) )
{
if ( ! CloseHandle( _this->waitset ) ) { abort(); }
free( _this );
}
}
static void prv_per_thread_tlsdtor( void *tls )
{
if ( tls ) { per_thread_release( tls ); }
}
static void prv_condvar_pop( condvar_t *_this, per_thread_t *thread )
{
per_thread_t *next = thread->next;
per_thread_t *prev = thread->prev;
if ( ! prev && ! next )
{
_this->front = 0;
_this->back = 0;
}
else if ( ! prev && next )
{
next->prev = 0;
_this->front = next;
}
else if ( prev && ! next )
{
prev->next = 0;
_this->back = prev;
}
else
{
next->prev = prev;
prev->next = prev;
}
--_this->count;
thread->next = 0;
thread->prev = 0;
}
static int prv_condvar_timeout( condvar_t *_this, per_thread_t *thread )
{
int err = 0;
LONG count = 0;
EnterCriticalSection( &_this->mutex );
if ( thread->state )
{
prv_condvar_pop( _this, thread );
count = _this->count;
thread->state = 0;
err = ETIMEDOUT;
}
if ( err )
{
/* process cancels */
InterlockedExchange
( &_this->waiters,
count );
}
LeaveCriticalSection( &_this->mutex );
if ( err ) { per_thread_release( thread ); }
return err;
}
void condvar_init( condvar_t *_this )
{
_this->front = 0;
_this->back = 0;
_this->waiters = 0;
_this->count = 0;
InitalizeCriticalSection( &_this->mutex );
}
int condvar_destroy( condvar_t *_this )
{
int err = 0;
EnterCriticalSection( &_this->mutex );
if ( _this->front ) { err = EBUSY; }
LeaveCriticalSection( &_this->mutex );
if ( ! err )
{
DeleteCriticalSection( &_this->mutex );
if ( _this->count ) { abort(); }
}
return err;
}
int condvar_timedwait( condvar_t *_this, mutex_t *mutex, DWORD timeout )
{
int err = 0;
DWORD wait_ret;
per_thread_t *thread = per_thread_self();
InterlockedIncrementAcquire( &thread->refs );
EnterCriticalSection( &_this->mutex );
if ( ! _this->front )
{
_this->front = thread;
_this->back = thread;
}
else
{
_this->back->next = thread;
thread->prev = _this->back;
_this->back = thread;
}
++_this->count;
thread->state = PER_THREAD_WAITING;
InterlockedIncrementRelease( &_this->waiters );
LeaveCriticalSection( &_this->mutex );
LeaveCriticalSection( mutex );
wait_ret = WaitForSingleObject
( thread->waiter,
timeout );
if ( wait_ret != WAIT_OBJECT_0 )
{
err = prv_condvar_timeout( _this, thread );
if ( wait_ret != WAIT_TIMEOUT ) { abort(); }
}
EnterCriticalSection( mutex );
return err;
}
void condvar_signal( condvar_t *_this )
{
register LONG cmp, old = _this->waiters;
do
{
cmp = old;
old = InterlockedCompareExchangeAcquire
( &_this->waiters,
( old > 0 ) ? old - 1 : old,
cmp );
} while ( cmp != old );
if ( old )
{ /* slow-path */
per_thread_t *thread;
EnterCriticalSection( &_this->mutex );
thread = _this->front;
if ( thread )
{
prv_condvar_pop( _this, thread );
assert( thread->state == PER_THREAD_WAITING );
thread->state = 0;
}
LeaveCriticalSection( &_this->mutex );
if ( thread )
{
if ( ! SetEvent( thread->waiter ) ) { abort(); }
per_thread_release( thread );
}
}
}
void condvar_broadcast( condvar_t *_this )
{
if ( InterlockedExchange( &_this->waiters, 0 ) )
{ /* slow-path */
per_thread_t *thread, *next;
EnterCriticalSection( &_this->mutex );
thread = _this->front;
if ( thread )
{
per_thread_t *temp = thread;
_this->front = 0;
_this->back = 0;
_this->count = 0;
while ( thread )
{
assert( thread->state == PER_THREAD_WAITING );
thread->state = 0;
thread = thread->next;
}
thread = temp;
}
LeaveCriticalSection( &_this->mutex );
while ( thread )
{
next = thread->next;
assert( ! thread->state );
thread->next = 0;
thread->prev = 0;
if ( ! SetEvent( thread->waiter ) ) { abort(); }
per_thread_release( thread );
thread = next;
}
}
}
Cancellation logic is coming soon...
What do you think of my algo? Is it total crap? :O
My library uses something very similar...
;)
> static int prv_condvar_timeout( condvar_t *_this, per_thread_t *thread )
> {
> int err = 0;
> LONG count = 0;
>
> EnterCriticalSection( &_this->mutex );
>
> if ( thread->state )
> {
> prv_condvar_pop( _this, thread );
>
> count = _this->count;
> thread->state = 0;
>
> err = ETIMEDOUT;
> }
>
> if ( err )
> {
> /* process cancels */
> InterlockedExchange
> ( &_this->waiters,
> count );
> }
>
> LeaveCriticalSection( &_this->mutex );
>
> if ( err ) { per_thread_release( thread ); }
>
> return err;
> }
Heres the fixed code:
static int prv_condvar_timeout( condvar_t *_this, per_thread_t *thread )
{
int err = 0, wait = 0;
LONG count = 0;
EnterCriticalSection( &_this->mutex );
if ( thread->state )
{
prv_condvar_pop( _this, thread );
count = _this->count;
thread->state = 0;
err = ETIMEDOUT;
}
else { wait = 1; }
if ( err )
{
/* process cancels */
InterlockedExchange
( &_this->waiters,
count );
}
LeaveCriticalSection( &_this->mutex );
if ( wait )
{
/* we are signaled for sure, so we need to wait on that */
if ( WaitForSingleObject
( thread->waiter,
INFINITE ) != WAIT_OBJECT_0 )
{
abort();
}
}
if ( err ) { per_thread_release( thread ); }
return err;
}
Sorry!
> static void prv_condvar_pop( condvar_t *_this, per_thread_t *thread )
> {
>
> per_thread_t *next = thread->next;
> per_thread_t *prev = thread->prev;
>
> if ( ! prev && ! next )
> {
> _this->front = 0;
> _this->back = 0;
> }
>
> else if ( ! prev && next )
> {
> next->prev = 0;
> _this->front = next;
> }
>
> else if ( prev && ! next )
> {
> prev->next = 0;
> _this->back = prev;
> }
>
> else
> {
> next->prev = prev;
^^^^^^^^^^^^^^^^^^^^^^^^
Has to be:
next->prev = next;
DOH!!!
> prev->next = prev;
> }
>
> --_this->count;
>
> thread->next = 0;
> thread->prev = 0;
> }
=^)
What do you think of it Alex?
;)