low overhead mpsc and work-stealing

1822 views
Skip to first unread message

Pierre Habouzit

unread,
Nov 11, 2010, 8:05:28 PM11/11/10
to Scalable Synchronization Algorithms
I'm using a work-stealing scheduler, with a per thread bounded dequeue
that can be pushed/poped at the back from the local thread, and popped
from the front by thieves. Yes this means that my tasks run in LIFO
order most of the time, and I'm fine with that.

When a task is queued in a full dequeue then the task is run
immediately instead of beeing queued, and when thieves cannot steal
anything they are blocked using an eventcount.

In addition to that, I'd like to have serial queues of tasks that
ensure that:
- tasks are run in a full FIFO fashion;
- at most one of their task can be run at the same time.
Though tasks from different serial queues can be run at the same time
without problem. The idea is that a queue allows to protect a resource
(think file, GUI, ...).

My requirements are that:
- queues are registered at most once in the scheduler at any time;
- non empty queues are always registered within the scheduler;
- empty queues are not registered in the scheduler (though it is
acceptable that just after a queue becomes empty it remains registered
when a race occurs, as long as it eventually finds its way out)

I'm using a setup very similar to the low overhead mpsc queue, that
I've tweaked this way:

enum {
QUEUE_EMPTY,
QUEUE_NOT_EMPTY,
QUEUE_RUNNING,
};

struct mpsc_queue_t {
... /* the usual stuff */
volatile unsigned state;
job_t run_queue;
};

struct mpsc_node_t {
... /* the usual stuff */
job_t *job;
};

mpsc_node_t *mpsc_queue_push(mpsc_queue_t *q, job_t *job)
{
mpsc_node_t *n = mpsc_node_alloc(); /* uses a per-thread cache */

n->job = job;
mpsc_queue_push(q, n);
if (XCHG(&q->state, QUEUE_NOT_EMPTY)) == QUEUE_EMPTY)
schedule_job(&q->run_queue); /* points to queue_run */
}

void queue_run(job_t *job)
{
mpsc_queue_t *q = container_of(job, mpsc_queue_t, run_queue);
mpsc_node_t *n;

do {
XCHG(&q->state, QUEUE_RUNNING);

while ((n = mpsc_queue_pop(q))) {
job_t *job = n->job;

mpsc_node_free(n); /* releases in cache first, so that
it's hot if quickly reused */
job_run(job);
}
} while (!CAS(&q->state, QUEUE_RUNNING, QUEUE_EMPTY));
}


I think this works, but I'm also pretty sure that this is spoiling the
underlying mpsc a lot, because I create contention on q->state for
everyone involved which is ugly, not to mention the ugly loop in the
consumer. So, is there someone that has any idea on how to make that
idea better, or does it looks like it's good enough ?

Note: since the scheduler uses a LIFO that registering into the
scheduler from queue_run() will just do the same as the loop on the
CAS because jobs run in LIFO order most of the time, so it just hides
the loop in the scheduler, and it still doesn't fix the contention due
to the additionnal XCHG in the _push operation :/

Dmitriy Vyukov

unread,
Nov 12, 2010, 3:42:14 AM11/12/10
to Scalable Synchronization Algorithms
On Nov 12, 4:05 am, Pierre Habouzit <pierre.habou...@intersec-
Hi Pierre,

Yes, it's a way too much overhead for RUNNABLE/NONRUNNABLE detection.
It can be done with ~1 cycle overhead for producers and 0 cycle
overhead for consumer's fast-path. The idea is that we only need to
track state changes between RUNNABLE<->NONRUNNABLE, moreover on
producer's side it can be combined with the XCHG in enqueue(), while
on consumer's side we need an additional RMW which is executed only
when queue become empty. The key is that we can use low bit of queue
tail pointer as RUNNABLE/NONRUNNABLE flag.

Here is the code (and yes, here I still call queue's tail as 'head'):

template<typename T>
T XCHG(T volatile* dest, T value)
{
return (T)_InterlockedExchange((long*)dest, (long)value);
}

template<typename T>
bool CAS(T volatile* dest, T cmp, T xchg)
{
return cmp == (T)_InterlockedCompareExchange((long*)dest,
(long)xchg, (long)cmp);
}

struct mpscq_node_t
{
mpscq_node_t* volatile next;
void* state;
};

struct mpscq_t
{
mpscq_node_t* volatile head;
mpscq_node_t* tail;
};

void mpscq_create(mpscq_t* self, mpscq_node_t* stub)
{
stub->next = 0;
self->tail = stub;
// mark it as empty
self->head = (mpscq_node_t*)((uintptr_t)stub | 1);
}

bool spscq_push(mpscq_t* self, mpscq_node_t* n)
{
n->next = 0;
// serialization-point wrt producers
mpscq_node_t* prev = XCHG(&self->head, n);
// only 2 AND instructions on fast-path added
bool was_empty = ((uintptr_t)prev & 1) != 0;
prev = (mpscq_node_t*)((uintptr_t)prev & ~1);
prev->next = n; // serialization-point wrt consumer
return was_empty;
}

mpscq_node_t* spscq_pop(mpscq_t* self)
{
mpscq_node_t* tail = self->tail;
l_retry:
// fast-path is not modified
mpscq_node_t* next = tail->next; // serialization-point wrt
producers
if (next)
{
self->tail = next;
tail->state = next->state;
return tail;
}
mpscq_node_t* head = self->head;
// if head is marked as empty,
// then the queue had not be scheduled in the first place
assert(((uintptr_t)head & 1) == 0);
if (tail != head)
{
// there is just a temporal gap -> wait for the producer to
update 'next' link
while (tail->next == 0)
_mm_pause();
goto l_retry;
}
else
{
// the queue seems to be really empty -> try to mark it as
empty
mpscq_node_t* xchg = (mpscq_node_t*)((uintptr_t)tail | 1);
if (CAS(&self->head, tail, xchg))
// if we succesfully marked it as empty -> return
// the following producer will re-schedule the queue for
execution
return 0;
// producer had enqueued new item
goto l_retry;
}
}

int main()
{
mpscq_t q;
mpscq_create(&q, new mpscq_node_t);

mpscq_node_t* n = 0;
//n = spscq_pop(&q);

spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
n = spscq_pop(&q);

spscq_push(&q, new mpscq_node_t);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
n = spscq_pop(&q);
n = spscq_pop(&q);

spscq_push(&q, new mpscq_node_t);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
spscq_push(&q, new mpscq_node_t);
n = spscq_pop(&q);
n = spscq_pop(&q);
n = spscq_pop(&q);
}

Now, whenever spscq_push() returns true, the thread has to put it into
scheduler for execution. Once the queue scheduled for execution, a
thread pops from the queue until it returns 0.
The nice thing is that consumer is not obliged to process all items in
a queue. It can process for example 100 items, and then just return
the queue to scheduler, and switch to other starving queues.
Also with regard to that _mm_pause() loop in pop(), instead of waiting
for producer to restore 'next' pointer, consumer can decide to just
return the queue to scheduler. Then handle some other queues, and when
he will back the first queue, the 'next' link will be most likely
already restored. So no senseless waiting.


--
Dmitriy V'jukov

Pierre Habouzit

unread,
Nov 12, 2010, 7:30:25 AM11/12/10
to Scalable Synchronization Algorithms
On Nov 12, 9:42 am, Dmitriy Vyukov <dvyu...@gmail.com> wrote:
> On Nov 12, 4:05 am, Pierre Habouzit <pierre.habou...@intersec-
> > I think this works, but I'm also pretty sure that this is spoiling the
> > underlying mpsc a lot, because I create contention on q->state for
> > everyone involved which is ugly, not to mention the ugly loop in the
> > consumer. So, is there someone that has any idea on how to make that
> > idea better, or does it looks like it's good enough ?
>
> > Note: since the scheduler uses a LIFO that registering into the
> > scheduler from queue_run() will just do the same as the loop on the
> > CAS because jobs run in LIFO order most of the time, so it just hides
> > the loop in the scheduler, and it still doesn't fix the contention due
> > to the additionnal XCHG in the _push operation :/
>
> Hi Pierre,
>
> Yes, it's a way too much overhead for RUNNABLE/NONRUNNABLE detection.

I thought so :)

> It can be done with ~1 cycle overhead for producers and 0 cycle
> overhead for consumer's fast-path. The idea is that we only need to
> track state changes between RUNNABLE<->NONRUNNABLE, moreover on
> producer's side it can be combined with the XCHG in enqueue(), while
> on consumer's side we need an additional RMW which is executed only
> when queue become empty. The key is that we can use low bit of queue
> tail pointer as RUNNABLE/NONRUNNABLE flag.

God I feel so stupid, I tried to use the LSB bits from my pointer but
failed to see how to :)

> bool spscq_push(mpscq_t* self, mpscq_node_t* n)
> {
>     n->next = 0;
>     // serialization-point wrt producers
>     mpscq_node_t* prev = XCHG(&self->head, n);
>     // only 2 AND instructions on fast-path added
>     bool was_empty = ((uintptr_t)prev & 1) != 0;

Okay, small nitpick, you don't need the != 0 test at all ;)

>     prev = (mpscq_node_t*)((uintptr_t)prev & ~1);
>     prev->next = n; // serialization-point wrt consumer
>     return was_empty;
>
> }



> Now, whenever spscq_push() returns true, the thread has to put it into
> scheduler for execution. Once the queue scheduled for execution, a
> thread pops from the queue until it returns 0.
> The nice thing is that consumer is not obliged to process all items in
> a queue. It can process for example 100 items, and then just return
> the queue to scheduler, and switch to other starving queues.
> Also with regard to that _mm_pause() loop in pop(), instead of waiting
> for producer to restore 'next' pointer, consumer can decide to just
> return the queue to scheduler. Then handle some other queues, and when
> he will back the first queue, the 'next' link will be most likely
> already restored. So no senseless waiting.

Thanks a lot, this was exactly the kind of stuff I was sure was
possible but I was missing it!

Saman Barghi

unread,
Mar 10, 2016, 12:49:37 PM3/10/16
to Scalable Synchronization Algorithms
Hi Dmitriy, 

I got here from your blog post. Just wanted to point out that since the queue needs at least one item to stay in the queue for it to work properly, 
the last item in the queue stays in the queue until another item arrives. Trying to pop the last item return 0 although there is 1 item left in the queue. In your blog post
you have a stub member to replace the last item, but here the stub item is being popped too. Check with this code: 

int main()
{
    const int MAX = 20;
    mpscq_t q;
    mpscq_create(&q, new mpscq_node_t);

    mpscq_node_t* n = 0;
    //n = spscq_pop(&q);

    mpscq_node_t* nodes[MAX];
    for(int i =0; i < MAX; i++){
        nodes[i] = new mpscq_node_t;
        nodes[i]->next = 0;
        spscq_push(&q, nodes[i]);
        printf("%p, ", nodes[i]);
    }
    printf("\n");

    for(int i =0; i < MAX+5; i++)
    {
        n = spscq_pop(&q);
        if(n !=0)
            printf("%p, ", n);
        else
            printf("\nNothing to pop!\n");
    }
}


Thanks,
Saman

Dmitry Vyukov

unread,
Mar 10, 2016, 12:54:22 PM3/10/16
to lock...@googlegroups.com
Hi Saman,

Please post full source code and point to the exact thing that should
not happen. I've already lost any context of this.

Thanks

Saman Barghi

unread,
Mar 10, 2016, 4:56:02 PM3/10/16
to Scalable Synchronization Algorithms
Dimitry, 

Here is a version of the code you posted in this thread modified for gcc. I also changed the main function to print out what is being pushed and popped to/from the queue: 

================================================================================
#include <cstdint>
#include <cassert>
#include <unistd.h>
#include <cstdio>

template<typename T>
T XCHG(T volatile* dest, T value)
{
    T t = (T)__sync_lock_test_and_set((long*)dest, (long)value);
    __sync_synchronize();
    return t;
}

template<typename T>
bool CAS(T volatile* dest, T cmp, T xchg)
{
    return cmp == (T)__sync_val_compare_and_swap((long*)dest, (long)xchg, (long)cmp);
        return tail; //This should be replaced with return next;
    }
    mpscq_node_t* head = self->head;
    // if head is marked as empty,
    // then the queue had not be scheduled in the first place
    assert(((uintptr_t)head & 1) == 0);
    if (tail != head)
    {
        // there is just a temporal gap -> wait for the producer to update 'next' link
        while (tail->next == 0)
            usleep(10);
        goto l_retry;
    }
    else
    {
        // the queue seems to be really empty -> try to mark it as empty
        mpscq_node_t* xchg = (mpscq_node_t*)((uintptr_t)tail | 1);
        if (CAS(&self->head, tail, xchg))
            // if we succesfully marked it as empty -> return
            // the following producer will re-schedule the queue for execution
            return 0;
        // producer had enqueued new item
        goto l_retry;
    }
}

int main()
{
    const int MAX = 20;
    mpscq_t q;
    mpscq_create(&q, new mpscq_node_t);

    mpscq_node_t* n = 0;
    //n = spscq_pop(&q);

    printf("Push: ");
    mpscq_node_t* nodes[MAX];
    for(int i =0; i < MAX; i++){
        nodes[i] = new mpscq_node_t;
        nodes[i]->next = 0;
        spscq_push(&q, nodes[i]);
        printf("%p, ", nodes[i]);
    }
    printf("\nPoP: ");

    for(int i =0; i < MAX+5; i++)
    {
        n = spscq_pop(&q);
        if(n !=0)
            printf("%p, ", n);
        else
            printf("\nNothing to pop!\n");
    }
}

================================================================================

And here is the output: 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Push: 0x91e030, 0x91e050, 0x91e070, 0x91e090, 0x91e0b0, 0x91e0d0, 0x91e0f0, 0x91e110, 0x91e130, 0x91e150, 0x91e170, 0x91e190, 0x91e1b0, 0x91e1d0, 0x91e1f0, 0x91e210, 0x91e230, 0x91e250, 0x91e270, 0x91e290
PoP:   0x91e010, 0x91e030, 0x91e050, 0x91e070, 0x91e090, 0x91e0b0, 0x91e0d0, 0x91e0f0, 0x91e110, 0x91e130, 0x91e150, 0x91e170, 0x91e190, 0x91e1b0, 0x91e1d0, 0x91e1f0, 0x91e210, 0x91e230, 0x91e250, 0x91e270
Nothing to pop!

Nothing to pop!

Nothing to pop!

Nothing to pop!

Nothing to pop!

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The stub item is being popped and the last item is not popped out of the queue. I just went over the code again and replacing the highlighted code above ( return tail;) with (return next;) would solve the problem.

Cheers,
Saman


In your original implementation here, the stub item is 

Dmitry Vyukov

unread,
Mar 16, 2016, 7:33:00 AM3/16/16
to lock...@googlegroups.com
You need to use the state field of nodes to hold state. Node itself is
merely a holder for state. Here is a fixed main function:

int main()
{
const int MAX = 20;
mpscq_t q;
mpscq_create(&q, new mpscq_node_t);

mpscq_node_t* n = 0;

printf("Push: ");
mpscq_node_t* nodes[MAX];
for(int i =0; i < MAX; i++){
nodes[i] = new mpscq_node_t;
nodes[i]->state = (void*)(long)(i + 42);
spscq_push(&q, nodes[i]);
printf("%d, ", i + 42);
}
printf("\nPoP: ");

for(int i =0; i < MAX+5; i++)
{
n = spscq_pop(&q);
if(n !=0)
printf("%d, ", (int)(long)n->state);
> --
>
> ---
> You received this message because you are subscribed to the Google Groups
> "Scalable Synchronization Algorithms" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to lock-free+...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/lock-free/6f655220-1af6-41a8-bb67-a37350803a85%40googlegroups.com.
>
> For more options, visit https://groups.google.com/d/optout.



--
Dmitry Vyukov

All about lockfree/waitfree algorithms, multicore, scalability,
parallel computing and related topics:
http://www.1024cores.net

Saman Barghi

unread,
Apr 4, 2016, 6:33:16 PM4/4/16
to Scalable Synchronization Algorithms
That makes sense, but since I wanted the queue to be fully intrusive and required that no nodes left in the queue after the last pop (to preserve the next pointer as nodes can be used in other queues), I re-wrote both of your non-blocking and blocking versions. The blocking version is using the same idea of atomically changing the low bit of the tail to keep track of the queue emptiness, and the stub node is being inserted at the back of the queue whenever the head pointer catches up with it. Here is the pointer to my implementation:   

Dmitry Vyukov

unread,
Apr 5, 2016, 2:34:26 AM4/5/16
to lock...@googlegroups.com
On Tue, Apr 5, 2016 at 12:33 AM, Saman Barghi <sam...@gmail.com> wrote:
> That makes sense, but since I wanted the queue to be fully intrusive and
> required that no nodes left in the queue after the last pop (to preserve the
> next pointer as nodes can be used in other queues),

The queue does support this. Node returned from Pop can be pushed to
other queues.
> https://groups.google.com/d/msgid/lock-free/c8b6dd2c-fd06-4cdf-a062-553dc4853571%40googlegroups.com.

Saman Barghi

unread,
Apr 5, 2016, 1:17:00 PM4/5/16
to lock...@googlegroups.com
On Tue, Apr 5, 2016 at 2:34 AM, Dmitry Vyukov <dvy...@gmail.com> wrote:
On Tue, Apr 5, 2016 at 12:33 AM, Saman Barghi <sam...@gmail.com> wrote:
> That makes sense, but since I wanted the queue to be fully intrusive and
> required that no nodes left in the queue after the last pop (to preserve the
> next pointer as nodes can be used in other queues),

The queue does support this. Node returned from Pop can be pushed to
other queues.
 
I mean using the queue without relying on passing the state to the next node. I wanted the nodes to hold the state and can be pushed directly to other queues instead of updating the state every time. 
The non-blocking version is fine but the blocking version does not let go of the very last node. This is the behaviour of the queue if pushing and popping 3 nodes:

push(N0) -> state 0
push(N1) -> state 1
push(N2) -> state 2

the queue looks like: 
q1: stub->N0->N1->N2->0
      tail                    head

Now if I do 4 pops I get: 

pop() -> stub -> state 0
pop() -> N0   -> state 1
pop() -> N1   -> state 2
pop() -> 0 

Now I have all the 
q1: N2->0
       tail
       head
       lowerbit = 1

Although I can pop all the states properly, I still have the last node stuck in the queue and my requirement was to be able to use all nodes and push them in other queues directly. I am dealing with light-weight threads and need to push them around and want the object representing the thread to be used directly in the queue and act as a node, and not attaching to a node. All I changed in your code was to push the stub back at the end, so I can use the stub to keep track of the emptiness of the queue. So the above looks like this now: 

push(N0)
push(N1)
push(N2)

the queue looks like: 
q2: stub->N0->N1->N2->0
      tail                    head

Now when I do the first pop, I remove the stub and push it at the end of the queue:
pop() -> N0
the queue looks like: 
q1: N1->N2->stub->0
      tail                    head

now the rest of the pops: 
pop() -> N1
pop() -> N2
pop() -> 0

and the queue looks like this: 

q1: stub->0
       tail
       head
       lowerbit = 1

I should have been more clear: your queue is fine and nodes can be used in other queues, but my requirements were different so I have to change your code to meet the requirements I had in mind. 

Thanks,
Saman

You received this message because you are subscribed to a topic in the Google Groups "Scalable Synchronization Algorithms" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/lock-free/nvjCNJgb0bA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to lock-free+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lock-free/CAEeQi3sWiMjrzk4Xnk8LRrFX90e%3D1iQmWH%2BtQuUeADrds-EnOg%40mail.gmail.com.

Benjamin Stadin

unread,
Jul 8, 2019, 6:49:36 AM7/8/19
to Scalable Synchronization Algorithms
Hi Dimitry,

interesting implementation. Is your queue linearizable, e.g. can it be used as FIFO?

Regards
Ben
Reply all
Reply to author
Forward
0 new messages