usage of dmitry's mpsc queue

1,125 views
Skip to first unread message

Ephi Sinowitz

unread,
Sep 16, 2011, 2:33:31 PM9/16/11
to Scalable Synchronization Algorithms
Hi,
I implemented the queue http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
with a signaling mechanism for the consumer.
Do I need to ensure visibility somehow before setting the event since
I'm not spinning in the consumer? How?

Producers:
----------------
vyukov_queue.push(item);
SetEvent(m_queueEvent);

Consumer:
---------------
for(;;)
{
WaitForSingleObject(m_queueEvent, INFINITE);
while (void* item = vyukov_queue.pop())
{
// do work
}
}

For reference here is the push()
void MPSC_Queue::push(void* item)
{
item->next = NULL;
void* old_head = InterlockedExchangePointer((void *volatile
*)&m_head, item);

// BLOCK POINT

old_head->next = item; // Do I need to ensure this is visible to
the consumer thread?
}

Any other critique or comments welcome as well :)
Thanks,
-ephi

Dmitriy Vyukov

unread,
Sep 21, 2011, 1:11:53 PM9/21/11
to Scalable Synchronization Algorithms
Since you are doing signaling (SetEvent) after each push, I think you
don't need anything special for it to work.
However of course you need correct memory fences in the queue
algorithm itself, that is I think as follows:
void mpscq_push(mpscq_t* self, mpscq_node_t* n)
{
n->next = 0;
mpscq_node_t* prev = XCHG(&self->head, n); // ACQUIRE_RELEASE
prev->next = n; // RELEASE
}

mpscq_node_t* mpscq_pop(mpscq_t* self)
{
mpscq_node_t* tail = self->tail;
mpscq_node_t* next = tail->next; // ACQUIRE
if (next)
{
self->tail = next;
tail->state = next->state;
return tail;
}
return 0;
}

XCHG-RELEASE in push synchronizes-with XCHG-ACQUIRE in push executed
by subsequent producer (that is to properly handoff a node from one
producer to another in order to fill in 'next').
STORE-RELEASE in push synchronizes-with LOAD-ACQUIRE in pop (that is
to properly hand off nodes from producers to consumers).
However, that frequent signaling makes the whole algorithm sort of
senseless, because SetEvent is heavy and most likely involved
locking.
Ideally you want to signal only when there is a waiting consumer.
Simple CRITICAL_SECTION+CondVar will be most likely faster than this.

Ephi Sinowitz

unread,
Sep 21, 2011, 2:06:49 PM9/21/11
to Scalable Synchronization Algorithms
But I am on x64. Shouldn't the STORE-RELEASE /LOAD-ACQUIRE come
automatically.
http://www.justsoftwaresolutions.co.uk/threading/intel-memory-ordering-and-c++-memory-model.html

> However, that frequent signaling makes the whole algorithm sort of
> senseless, because SetEvent is heavy and most likely involved locking.
> Ideally you want to signal only when there is a waiting consumer.
> Simple CRITICAL_SECTION+CondVar will be most likely faster than this.


OK. So like this?
Producers
---------------
push()
if (TryEnterCriticalSection(&cs)) WakeConditionVariable(&cond);
Consumer
---------------
EnterCriticalSection(&cs);
for (;;)
SleepConditionVariableCS (&cond, &cs, INFINITE);

Ephi Sinowitz

unread,
Sep 21, 2011, 2:19:08 PM9/21/11
to Scalable Synchronization Algorithms


On Sep 21, 2:06 pm, Ephi Sinowitz <esinowitz.pub...@gmail.com> wrote:
> But I am on x64. Shouldn't the STORE-RELEASE /LOAD-ACQUIRE come
> automatically.http://www.justsoftwaresolutions.co.uk/threading/intel-memory-orderin...


Ok I see where I'm wrong here. x86 just ensures read ordering against
other reads and stores against other stores.
Bear with me here I'm just learning :)

Dmitriy Vyukov

unread,
Sep 21, 2011, 2:35:46 PM9/21/11
to Scalable Synchronization Algorithms
IA-32/Intel64 stores are store-release, loads are load-acquire, and
atomic RMW operations are all seq_cst.
So it will do IF you program in assembly/machine codes. C/C++ does not
have any of these guarantees.

Dmitriy Vyukov

unread,
Sep 21, 2011, 2:43:03 PM9/21/11
to Scalable Synchronization Algorithms
On Sep 21, 11:06 am, Ephi Sinowitz <esinowitz.pub...@gmail.com> wrote:
> But I am on x64. Shouldn't the STORE-RELEASE /LOAD-ACQUIRE come
> automatically.http://www.justsoftwaresolutions.co.uk/threading/intel-memory-orderin...
>
> > However, that frequent signaling makes the whole algorithm sort of
> > senseless, because SetEvent is heavy and most likely involved locking.
> > Ideally you want to signal only when there is a waiting consumer.
> > Simple CRITICAL_SECTION+CondVar will be most likely faster than this.
>
> OK. So like this?
> Producers
> ---------------
> push()
> if (TryEnterCriticalSection(&cs)) WakeConditionVariable(&cond);
> Consumer
> ---------------
> EnterCriticalSection(&cs);
> for (;;)
>    SleepConditionVariableCS (&cond, &cs, INFINITE);
>     while (void* item = vyukov_queue.pop())
>     {
>           // do work
>      }


Good try, but condvars do not work that way.
Consider the following sequence of events:
1. Consumer checks that the queue is empty.
2. Producer puts an element into the queue.
3. Producer tries to lock the mutex and fails.
4. Consumer unlocks the mutex and blocks forever while there is the
element in the queue.

Search this group, or c.p.t, or Intel Software Network for
"eventcount". Perhaps Intel Software Network is the best place to
start, search for "eventcount gate" there.

Ephi Sinowitz

unread,
Sep 21, 2011, 3:11:50 PM9/21/11
to Scalable Synchronization Algorithms
So what did you mean about using CRITICAL SECTION + CondVar? Together
with 30000 instead of INFINITE?

Dmitriy V'jukov

unread,
Sep 21, 2011, 3:21:09 PM9/21/11
to lock...@googlegroups.com
On Wed, Sep 21, 2011 at 12:11 PM, Ephi Sinowitz <esinowit...@gmail.com> wrote:
So what did you mean about using CRITICAL SECTION + CondVar? Together
with 30000 instead of INFINITE?

I think it should work.


--
Dmitry Vyukov

All about lockfree/waitfree algorithms, multicore, scalability, parallel computing and related topics:
http://www.1024cores.net

Ephi Sinowitz

unread,
Sep 21, 2011, 3:29:08 PM9/21/11
to Scalable Synchronization Algorithms


On Sep 21, 3:21 pm, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:
> On Wed, Sep 21, 2011 at 12:11 PM, Ephi Sinowitz
> <esinowitz.pub...@gmail.com>wrote:
>
>
>
> > So what did you mean about using CRITICAL SECTION + CondVar? Together
> > with 30000 instead of INFINITE?
>
> I think it should work.
>
> --
> Dmitry Vyukov
>
> All about lockfree/waitfree algorithms, multicore, scalability, parallel
> computing and related topics:http://www.1024cores.net

Thanks,
Very hard to reason about this stuff as opposed to good old mutex
based programming (although I should have caught that last one -
nothing to do with lock-free). I think I need a good grounding in the
basics first. I have "The Art of MultiProcessor Programming" on order.

-ephi

Ephi Sinowitz

unread,
Sep 21, 2011, 3:42:01 PM9/21/11
to Scalable Synchronization Algorithms
So for the intrusive queueI need a barrier after every read and write
of next?
and I believe the InterlockExchange should give me full seq_cst
like so.
void mpscq_push(mpscq_t* self, mpscq_node_t* n)
{
n->next = 0;
mpscq_node_t* prev = InterlockedExchangePointer(&self->head, n);
//(*)
prev->next = n;
_ReadWriteBarrier();

}

mpscq_node_t* mpscq_pop(mpscq_t* self)
{
mpscq_node_t* tail = self->tail;
mpscq_node_t* next = tail->next;
_ReadWriteBarrier();
if (tail == &self->stub)
{
if (0 == next)
return 0;
self->tail = next;
tail = next;
next = next->next;
_ReadWriteBarrier();
}
if (next)
{
self->tail = next;
return tail;
}
mpscq_node_t* head = self->head;
if (tail != head)
return 0;
mpscq_push(self, &self->stub);
next = tail->next;
_ReadWriteBarrier();
if (next)
{
self->tail = next;
return tail;
}
return 0;
}
Reply all
Reply to author
Forward
0 new messages