What's the latest state-of-the art for EventCounts?

118 views
Skip to first unread message

Andy Venikov

unread,
Apr 29, 2011, 6:53:57 PM4/29/11
to
There are several implementations out there.
Notably by Joe Seigh as part of his library.
Also several posts by Dmitry and Chris.
I also saw one by Rani Sharoni.

Is there a consensus what's currently the best algorithm?

Thanks,
Andy.

Andy Venikov

unread,
May 2, 2011, 10:47:35 AM5/2/11
to

Since the platform I care about is Linux, I think I could just use
futexes. I mean, futex is just an implementation of an event count.
Of course it's much more than that, but it can easily translate
itself to an event count. No?

Thanks,
Andy.

Andy Venikov

unread,
May 5, 2011, 12:01:34 PM5/5/11
to

What I meant was something like this.
Assume we have a lock-free queue "Queue".

Queue queue;
int ec = 0; //1 means waiting

//consumer(s):
while (true)
{
SomeData data;
while (!queue.try_deque(&data)) //it's also a full barrier
{
ec = 1; //atomic write
//place StoreLoad barrier here
if (!queue.try_deque(&data))
{
futex(FUTEX_WAIT, &ec, 1);
}
else
{
break;
}
}
process(data);
}

//producer(s):
queue.enqueue(data); //full barrier
int ecValue = ec; //atomic read
if (0 != ecValue)
{
ecValue = atomic_exchange(&ec, 0);
}
if (0 != ecValue)
{
futex(FUTEX_WAKE, &ec);
}

Of course the consumer could be optimized by not entering the wait right
away but rather trying to dequeue a few more times with a cpu_relax
thrown in the mix, but you get the idea.
I'm more concerned with the correctness.
This algorithm can allow one consumer to unnecessarily wake one extra
waiter that might go immediately back to sleep, but that won't break
correctness. (The producer may signal a wake even though the item was
picked out of the queue by a non-sleeping thread).
Otherwise it looks correct to me.
Any thoughts?

Thanks,
Andy.

Chris M. Thomasson

unread,
May 5, 2011, 10:29:02 PM5/5/11
to
"Andy Venikov" <swojch...@gmail.com> wrote in message
news:ipuhl0$tvm$1...@dont-email.me...
[...]

> What I meant was something like this.
> Assume we have a lock-free queue "Queue".

[...]

> Of course the consumer could be optimized by not entering the wait right
> away but rather trying to dequeue a few more times with a cpu_relax thrown
> in the mix, but you get the idea.
> I'm more concerned with the correctness.
> This algorithm can allow one consumer to unnecessarily wake one extra
> waiter that might go immediately back to sleep, but that won't break
> correctness. (The producer may signal a wake even though the item was
> picked out of the queue by a non-sleeping thread).
> Otherwise it looks correct to me.
> Any thoughts?

I have not programmed threads for a couple of month's now, so please forgive
me if I am way off base here... ;^)

I do not think that algorithm will work _unless_ the call to
`futex(FUTEX_WAKE, &ec)' wakes _all_ of the current waiters. I think you
have to explicitly specify the number of waiters:

http://www.kernel.org/doc/man-pages/online/pages/man2/futex.2.html

So a call like:
_________________________________________________
#define futex_broadcast(f) \
futex((f), FUTEX_WAKE, INT_MAX, NULL, NULL, 0)


futex_broadcast(&ec);
_________________________________________________

might be in order... Humm... What do you think? I am thinking about a
scenario in which 3 threads hit an empty queue and wait. Now the state is
three waiter threads; empty queue with an `ec' value of 1. Then another
thread produces something, see's `ec' as 1 and atomically resets it and then
releases a single waiter that dequeues said item. Then three more producer
threads come along and enqueue items but observes `ec' as 0. Now you have 3
items in the queue, and 2 "stalled" waiters... Humm...

Need to think some more on this.


Andy Venikov

unread,
May 6, 2011, 2:24:04 PM5/6/11
to
On 05/05/2011 10:29 PM, Chris M. Thomasson wrote:
> "Andy Venikov"<swojch...@gmail.com> wrote in message

<snip>

>
> I have not programmed threads for a couple of month's now, so please forgive
> me if I am way off base here... ;^)

Welcome back :-)
I've started to miss the great discussions between you and Dmitry.


>
> I do not think that algorithm will work _unless_ the call to

> `futex(FUTEX_WAKE,&ec)' wakes _all_ of the current waiters. I think you


> have to explicitly specify the number of waiters:
>
> http://www.kernel.org/doc/man-pages/online/pages/man2/futex.2.html
>
> So a call like:
> _________________________________________________
> #define futex_broadcast(f) \
> futex((f), FUTEX_WAKE, INT_MAX, NULL, NULL, 0)
>
>
> futex_broadcast(&ec);
> _________________________________________________
>
> might be in order... Humm... What do you think? I am thinking about a
> scenario in which 3 threads hit an empty queue and wait. Now the state is
> three waiter threads; empty queue with an `ec' value of 1. Then another
> thread produces something, see's `ec' as 1 and atomically resets it and then
> releases a single waiter that dequeues said item. Then three more producer
> threads come along and enqueue items but observes `ec' as 0. Now you have 3
> items in the queue, and 2 "stalled" waiters... Humm...
>
> Need to think some more on this.

Yup, you're right.
The reason for my error was that in my real problem there is just one
consumer, and the queue is MPSC. The consumer would actually not do
anything in terms of processing the data, but queue it off to others,
acting as a sort-of a dispatcher. So it would quickly exhaust the queue
if there was anything in it.
And right before posting I thought hmmm, looks like this could work for
multiple consumers as well.... Wrong!
I think in order to make this solution multi-consumer-friendly, we have
to play tricks with "ec" and make it a real counter to indicate the
amount of waiters. I'll think about it over the week-end.

Of course, it could be used as-is with "wake everybody" semantics, but
then we would get the "thundering herd" problem, which is usually not
a simple problem to solve either.

Thanks,
Andy.

Chris M. Thomasson

unread,
May 7, 2011, 7:49:13 PM5/7/11
to
"Andy Venikov" <swojch...@gmail.com> wrote in message
news:iq1ec6$aoo$1...@dont-email.me...
[...]

> I think in order to make this solution multi-consumer-friendly, we have
> to play tricks with "ec" and make it a real counter to indicate the
> amount of waiters. I'll think about it over the week-end.
>
> Of course, it could be used as-is with "wake everybody" semantics, but
> then we would get the "thundering herd" problem, which is usually not
> a simple problem to solve either.


Here is some pseudo-code for one of my "very simple" eventcount algorithms,
membars aside for a moment, that works with the upcoming C++ standard.
Modulo any bugs/typos, here it is... Typed directly in the newsreader,
lol...:
_______________________________________________________
struct eventcount
{
#define EC_WAITBIT 0x1U
#define EC_INCVAL 0x2U

std::atomic<unsigned> m_state; // = 0
unsigned m_waiters; // = 0
std::condition_variable m_cond;
std::mutex m_mutex;

void prv_signal(unsigned state, bool broadcast)
{
if (! (state & EC_WAITBIT)) return;

m_mutex.lock();

if (m_waiters)
{
unsigned xchg;

do
{
xchg = state + EC_INCVAL;

if (m_waiters == 1 || broadcast)
{
xchg &= ~EC_WAITBIT;
}

} while (! m_state.compare_exchange_weak(
state,
xchg));

if (broadcast)
{
m_cond.notify_all();
}

else
{
m_cond.notify_one();
}
}

m_mutex.unlock();
}

unsigned get()
{
return m_state.fetch_or(0x1U);
}

void signal()
{
unsigned state = m_state.load();

prv_signal(state, false);
}

void broadcast()
{
unsigned state = m_state.load();

prv_signal(state, true);
}

void wait(unsigned state1)
{
m_mutex.lock();

unsigned state2 = m_state.load();

if ((state1 & ~EC_WAITBIT) ==
(state2 & ~EC_WAITBIT))
{
++m_waiters;

m_cond.wait(m_mutex);

--m_waiters;
}

m_mutex.unlock();
}
};
_______________________________________________________


You can use it for a non-blocking queue algorithm like:
_______________________________________________________
static nbqueue g_queue;
static eventcount g_ecount;


void* consume()
{
void* state;

while (! (state = g_queue.try_pop()))
{
unsigned ecstate = g_ecount.get();
if ((state = g_queue.try_pop())) break;
g_ecount.wait(ecstate);
}

return state;
}


void produce(void* state)
{
g_queue.push(state);
g_ecount.signal();
}
_______________________________________________________


What do you think Andy?


Andy Venikov

unread,
May 9, 2011, 7:05:56 PM5/9/11
to
On 05/07/2011 07:49 PM, Chris M. Thomasson wrote:
<snip>


> Here is some pseudo-code for one of my "very simple" eventcount algorithms,
> membars aside for a moment, that works with the upcoming C++ standard.
> Modulo any bugs/typos, here it is... Typed directly in the newsreader,
> lol...:
> _______________________________________________________
> struct eventcount
> {
> #define EC_WAITBIT 0x1U
> #define EC_INCVAL 0x2U
>
>
>
> std::atomic<unsigned> m_state; // = 0
> unsigned m_waiters; // = 0
> std::condition_variable m_cond;
> std::mutex m_mutex;
>
>
>
> void prv_signal(unsigned state, bool broadcast)
> {

> if (! (state& EC_WAITBIT)) return;


>
> m_mutex.lock();
>
> if (m_waiters)
> {
> unsigned xchg;
>
> do
> {
> xchg = state + EC_INCVAL;
>
> if (m_waiters == 1 || broadcast)
> {

> xchg&= ~EC_WAITBIT;


> }
>
> } while (! m_state.compare_exchange_weak(
> state,
> xchg));
>
> if (broadcast)
> {
> m_cond.notify_all();
> }
>
> else
> {
> m_cond.notify_one();
> }
> }
>
> m_mutex.unlock();
> }
>
>
>
> unsigned get()
> {
> return m_state.fetch_or(0x1U);
> }
>
>
>
> void signal()
> {
> unsigned state = m_state.load();
>
> prv_signal(state, false);
> }
>
>
>
> void broadcast()
> {
> unsigned state = m_state.load();
>
> prv_signal(state, true);
> }
>
>
>
> void wait(unsigned state1)
> {
> m_mutex.lock();
>
> unsigned state2 = m_state.load();
>
> if ((state1& ~EC_WAITBIT) ==

> (state2& ~EC_WAITBIT))


> {
> ++m_waiters;
>
> m_cond.wait(m_mutex);
>
> --m_waiters;
> }
>
> m_mutex.unlock();
> }
> };
> _______________________________________________________
>
>
>
>
> You can use it for a non-blocking queue algorithm like:
> _______________________________________________________
> static nbqueue g_queue;
> static eventcount g_ecount;
>
>
> void* consume()
> {
> void* state;
>
> while (! (state = g_queue.try_pop()))
> {
> unsigned ecstate = g_ecount.get();
> if ((state = g_queue.try_pop())) break;
> g_ecount.wait(ecstate);
> }
>
> return state;
> }
>
>
> void produce(void* state)
> {
> g_queue.push(state);
> g_ecount.signal();
> }
> _______________________________________________________
>
>
>
>
> What do you think Andy?
>

I think I understand the idea.
But I think there's a possibility of a missed wake up here.
As I understand after a thread calls ec.get() it's not
supposed to miss an addition to the queue. Either it'll read
the added item off of the queue prior to waiting, or it will
get the wake up signal.
Let's see... Say thread1 is a consumer and thread2 is a producer.

Original state: queue is empty, m_state = 0;

Consumer:
g_queue.try_pop(); // false
ecstate = g_ecount.get(); //Sets wait bit to 1
//After this point the consumer is not supposed to miss wake ups
g_queue.try_pop(); //still false as the queue is still empty

Now producer:
g_queue.push(...); //One item is in the queue.
//It happens before consumer calls wait()
g_ecount.signal();
state = m_state.load(); //reads 0x1f
if (! (state & EC_WAITBIT)) // false since the wait bit is on
m_mutex.lock(); //takes the lock
if (m_waiters) // false since m_waiters is 0. Skip the body
m_mutex.unlock();
return;
//at this point m_state didn't get modified

Back to the consumer:
g_ecount.wait(0);
m_Mutex.lock(); //takes the lock
state2 = m_state.load(); // reads 0x1f
if (...) succedes since m_state wasn't modified
{
++ m_waiters;
m_cond.wait(m_mutex);// just missed the signal - now wait forever...


Did you type in all the code you've intended to type?
I think you probably meant to change m_state in prv_signal even if
m_waiters was 0. Because even if m_waiters is 0, it doesn't mean that
there aren't any waiters-to-be. So by modifying m_state you'll ensure
that any waiter-to-be will not wait but will retry popping the queue.

Is that right or did I miss something?

Andy.

Chris M. Thomasson

unread,
May 9, 2011, 7:28:19 PM5/9/11
to
"Andy Venikov" <swojch...@gmail.com> wrote in message
news:iq9s0l$dcs$1...@dont-email.me...

> On 05/07/2011 07:49 PM, Chris M. Thomasson wrote:
> <snip>
>
>
>> Here is some pseudo-code for one of my "very simple" eventcount
>> algorithms,
>> membars aside for a moment, that works with the upcoming C++ standard.
>> Modulo any bugs/typos, here it is... Typed directly in the newsreader,
>> lol...:
>> _______________________________________________________
[...]

>> What do you think Andy?
>>
>
> I think I understand the idea.
> But I think there's a possibility of a missed wake up here.
> As I understand after a thread calls ec.get() it's not
> supposed to miss an addition to the queue. Either it'll read
> the added item off of the queue prior to waiting, or it will
> get the wake up signal.
> Let's see... Say thread1 is a consumer and thread2 is a producer.
>
> Original state: queue is empty, m_state = 0;
>
> Consumer:
> g_queue.try_pop(); // false
> ecstate = g_ecount.get(); //Sets wait bit to 1
> //After this point the consumer is not supposed to miss wake ups
> g_queue.try_pop(); //still false as the queue is still empty

[...]


>
> Did you type in all the code you've intended to type?
> I think you probably meant to change m_state in prv_signal even if
> m_waiters was 0. Because even if m_waiters is 0, it doesn't mean that
> there aren't any waiters-to-be. So by modifying m_state you'll ensure
> that any waiter-to-be will not wait but will retry popping the queue.
>
> Is that right or did I miss something?

ARGHGHGH! I totally fuc%ed up the code. It should read as:
_______________________________________________________


void prv_signal(unsigned state, bool broadcast)
{

if (! (state & EC_WAITBIT)) return;

m_mutex.lock();

unsigned xchg;

do
{
xchg = state + EC_INCVAL;

if (m_waiters < 2 || broadcast)
{
xchg &= ~EC_WAITBIT;
}

} while (! m_state.compare_exchange_weak(
state,
xchg));

if (m_waiters)
{
if (broadcast)
{
m_cond.notify_all();
}

else
{
m_cond.notify_one();
}
}

m_mutex.unlock();
}
_______________________________________________________


Thank you VERY much for finding that nasty race-condition!

:^o


Also, I should change:
________________________________________________________
unsigned get()
{
return m_state.fetch_or(0x1U);
}

________________________________________________________

to:
________________________________________________________
unsigned get()
{
return m_state.fetch_or(EC_WAITBIT);
}

________________________________________________________


Thanks! ;^/


Andy Venikov

unread,
May 11, 2011, 7:28:34 PM5/11/11
to
On 05/09/2011 07:28 PM, Chris M. Thomasson wrote:

<snip>


> ARGHGHGH! I totally fuc%ed up the code. It should read as:
> _______________________________________________________
> void prv_signal(unsigned state, bool broadcast)
> {

> if (! (state& EC_WAITBIT)) return;


>
> m_mutex.lock();
>
> unsigned xchg;
>
> do
> {
> xchg = state + EC_INCVAL;
>
> if (m_waiters< 2 || broadcast)
> {

> xchg&= ~EC_WAITBIT;


> }
>
> } while (! m_state.compare_exchange_weak(
> state,
> xchg));
>
> if (m_waiters)
> {
> if (broadcast)
> {
> m_cond.notify_all();
> }
>
> else
> {
> m_cond.notify_one();
> }
> }
>
> m_mutex.unlock();
> }
> _______________________________________________________
>
>
> Thank you VERY much for finding that nasty race-condition!
>
> :^o
>
>
>
>
>
>
> Also, I should change:
> ________________________________________________________
> unsigned get()
> {
> return m_state.fetch_or(0x1U);
> }
>
> ________________________________________________________
>
>
>
> to:
> ________________________________________________________
> unsigned get()
> {
> return m_state.fetch_or(EC_WAITBIT);
> }
>
> ________________________________________________________
>
>
>
>
> Thanks! ;^/
>

Yeah, this looks much better.

There's some confusing bit manipulation code in your wait function.
Am I correct in assuming that if we change get() to:

return m_state.fetch_or(EC_WAITBIT) | EC_WAITBIT;

then in the wait() function there's no need for bit manipulation and it
could be written as:

void wait(unsigned state1)
{
m_mutex.lock();
unsigned state2 = m_state.load();

if (state1 == state2)
{ ... same here ... }
m_mutex.unlock();
}
?

I should probably also mention a possible (although highly
improbable) race-condition when m_state wraps-around.
(Effectively there's just 31 bits in m_state to use as a
generation-count).
What I mean is imagine that readers are much slower than
writers. Two readers are already waiting. The third reader
comes along but between it reading the queue and entering
wait(), there come 2 billion writers (maybe all in one
thread which actually makes it even more probable since there
won't be any contention on m_mutex). The m_state wraps around
and stays the same. Then the other two readers wake up, dequeue
their two items and go on their merry way. The third reader
finally enters wait. Sees that apparently m_state hasn't changed
and enters wait() even thought there are 2 billion - 2 items
in the queue by now.
I think it's a similar problem to how big a generation count for
ABA is big enough?
Although nowadays, I must admit, 2 billion doesn't sound like a very
safe number. I think if you use long long or some such then you're
all set. Although it's an interesting problem to ponder on.

I also thought more about the original code I posted (the one that
worked only for single consumer). I think that the event count variable
could be safely used to indicate the number of waiters.
I.e.:


//consumer(s):
while (true)
{
SomeData data;
while (!queue.try_deque(&data)) //it's also a full barrier
{

int n = atomic_fetch_add(&ec, 1) + 1;

if (!queue.try_deque(&data))
{
while (WOULD_BLOCK == futex(FUTEX_WAIT, &ec, n)
{
int nNew = ec; //atomic load
if (nNew < n)
break;
else
n = nNew;
}
atomic_decrement(&ec, 1);
}
else
{
atomic_decrement(&ec, 1);
break;
}
}
process(data);
}

//producer(s):
queue.enqueue(data); //full barrier
int ecValue = ec; //atomic read

//Assuming the CAS updates ecValue on failure:
while (!ecValue && !CAS(&ec, &ecValue, ecValue - 1))
{


}
if (0 != ecValue)
{
futex(FUTEX_WAKE, &ec);
}


Andy.

Chris M. Thomasson

unread,
Jun 9, 2011, 10:09:01 PM6/9/11
to
"Andy Venikov" <swojch...@gmail.com> wrote in message
news:iqf632$69h$1...@dont-email.me...

> On 05/09/2011 07:28 PM, Chris M. Thomasson wrote:
>
> <snip>
>
>
>> ARGHGHGH! I totally fuc%ed up the code. It should read as:
>> _______________________________________________________
[....]

>> _______________________________________________________
>>
>>
>> Thank you VERY much for finding that nasty race-condition!
>>
>> :^o
[...]

>> Thanks! ;^/
>>
>
> Yeah, this looks much better.

Hell yeah it does! ;^)


> There's some confusing bit manipulation code in your wait function.
> Am I correct in assuming that if we change get() to:
>
> return m_state.fetch_or(EC_WAITBIT) | EC_WAITBIT;
>
> then in the wait() function there's no need for bit manipulation and it
> could be written as:
>
> void wait(unsigned state1)
> {
> m_mutex.lock();
> unsigned state2 = m_state.load();
> if (state1 == state2)
> { ... same here ... }
> m_mutex.unlock();
> }
> ?

Sure; however it might cause some false wakeups. I just wanted to mask the
waitbit off before I compared anything.


> I should probably also mention a possible (although highly
> improbable) race-condition when m_state wraps-around.
> (Effectively there's just 31 bits in m_state to use as a
> generation-count).

[...]

Hell yeah. The algorithm will deadlock and totally bit the dust if this
condition rears it's ugly head! Yikes!

:^o


> - 2 items
> in the queue by now.
> I think it's a similar problem to how big a generation count for
> ABA is big enough?

It's a bit worse because the version count ABA solutions for CAS usually
compare the node pointer value as well. So the counter has to wrap AND the
node pointer value has to be exactly the same in order to trip the
race-condition. Here, in this eventcount there is no node pointer to
compare, just the version counter value. So, it's a bit worse than ABA wrt
lock-based collections (e.g., non-blocking stack)...


> Although nowadays, I must admit, 2 billion doesn't sound like a very
> safe number. I think if you use long long or some such then you're
> all set. Although it's an interesting problem to ponder on.

I think I have solved the problem using a very simple algorithm that I will
post at the end of this message. I don't use a eventcount, but rather a
single waiter bit and the predicate of the user algorithm itself. The
algorithm is pretty nice... Well, IMVHO of course!

:^)


> I also thought more about the original code I posted (the one that
> worked only for single consumer). I think that the event count variable
> could be safely used to indicate the number of waiters.
> I.e.:

[...]

Humm... Interesting. It should work. I need to take a closer look at it.


Anyway, here is what I am "thinking" about using now:
______________________________________________________________
class waitset
{
std::mutex m_mutex;
std::condition_variable m_cond;
std::atomic<bool> m_waitbit;
VAR_T(unsigned) m_waiters;

public:
waitset()
: m_waitbit(false),
m_waiters(0)
{

}


~waitset()
{
bool waitbit = m_waitbit.load(std::memory_order_relaxed);

unsigned waiters = VAR(m_waiters);

RL_ASSERT(! waitbit && ! waiters);
}

private:
void prv_signal(bool waitbit, bool broadcast)
{
if (! waitbit) return;

m_mutex.lock($);

unsigned waiters = VAR(m_waiters);

if (waiters < 2 || broadcast)
{
m_waitbit.store(false, std::memory_order_relaxed);
}

m_mutex.unlock($);

if (waiters)
{
if (! broadcast)
{
m_cond.notify_one($);
}

else
{
m_cond.notify_all($);
}
}
}

public:
void wait_begin()
{
m_mutex.lock($);

m_waitbit.store(true, std::memory_order_relaxed);

MEMBAR_SEQ_CST();
}


bool wait_try_begin()
{
if (! m_mutex.try_lock($)) return false;

m_waitbit.store(true, std::memory_order_relaxed);

MEMBAR_SEQ_CST();

return true;
}


void wait_cancel()
{
unsigned waiters = VAR(m_waiters);

if (! waiters)
{
m_waitbit.store(false, std::memory_order_relaxed);
}

m_mutex.unlock($);
}


void wait_commit()
{
MEMBAR_SEQ_CST();

++VAR(m_waiters);

m_cond.wait(m_mutex, $);

if (! --VAR(m_waiters))
{
m_waitbit.store(false, std::memory_order_relaxed);
}

m_mutex.unlock($);
}

public:
void signal()
{
MEMBAR_SEQ_CST();

bool waitbit = m_waitbit.load(std::memory_order_relaxed);

prv_signal(waitbit, false);
}


void broadcast()
{
MEMBAR_SEQ_CST();

bool waitbit = m_waitbit.load(std::memory_order_relaxed);

prv_signal(waitbit, true);
}
};
______________________________________________________________


I have coded up a simple test for it in Relacy Race Detector. Here is the
working code:

http://pastebin.com/q9R0sqT7


You can use it like:
______________________________________________________________
static nbqueue g_nbqueue;
static waitset g_waitset;


void* consumers()
{
void* state;

while (! (state = g_nbqueue.try_pop()))
{
g_waitset.wait_begin();

if ((state = g_nbqueue.try_pop()))
{
g_waitset.wait_cancel();

break;
}

g_waitset.wait_commit();
}

return state;
}


void producers(void* state)
{
g_nbqueue.push(state);
g_waitset.signal();
}
______________________________________________________________

Pretty simple. What do you think of it Andy?


Thanks!

:^)

Keith H Duggar

unread,
Jul 30, 2011, 10:15:12 PM7/30/11
to
Andy Venikov wrote:
> Since the platform I care about is Linux, I think I could just use
> futexes. I mean, futex is just an implementation of an event count.
> Of course it's much more than that, but it can easily translate
> itself to an event count. No?

I've been wondering about something similar lately. In particular,
consider Dmitry Vyukov's lockfree nearly waitfree SPSC queue here:

http://software.intel.com/en-us/articles/single-producer-single-consumer-queue/

Can that implementation be safely and efficiently made waitable in
Linux by adding the following few simple lines of futex code?

#include <unistd.h>
#include <linux/futex.h>
#include <sys/syscall.h>

inline void
futex_wait (
void * p
) {
syscall(SYS_futex,p,FUTEX_WAIT_PRIVATE,0,0,0,0) ;
}

inline void
futex_wake (
void * p
) {
syscall(SYS_futex,p,FUTEX_WAKE_PRIVATE,1,0,0,0) ;
}

class spsc_queue
{
...
public:
void wait ( ) { futex_wait(&tail_->next_) ; }
void wake ( ) { futex_wake(&tail_->next_) ; }
...
} ;

reusing tail_->next_ as the actual futex ;-) With usage as follows

// the single consumer
for(;;) {
SomeData data ;
while(!queue.deque(data)) {
queue.wait() ;
}
process(data) ;
}

// the single producer
queue.enqueue(data) ;
queue.wake() ;

Or is the simple "cheat" above too good to be true and/or entirely
broken? Your guidance would be greatly appreciated!

KHD

Reply all
Reply to author
Forward
0 new messages