Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Single producer, "one-shot" work queue implementation, no lock on queue.

106 views
Skip to first unread message

JC

unread,
Jan 16, 2009, 2:48:37 PM1/16/09
to
I have a bug in a work queue implementation I'm trying, and I can't
think of a good solution.

I have a list of work items that need to be processed periodically,
and the list can be constructed completely before processing begins.
There is a single thread that builds the list, and multiple worker
threads that perform the work concurrently while the single thread
waits for them all to finish.

It's not a continuous work queue; the producer thread adds items, says
"go!", the worker threads do it, and then the producer thread obtains
the results and continues on it's way. I think I can take advantage of
the fact that the producer never queues items while the worker threads
are running to avoid locking the work list. I've implemented this with
a semaphore and an event on Windows. Here is the implementation:

=== BEGIN CODE ===

#include <windows.h>
#include <vector>
#include <cassert>


class BasicWorkList {

public:

struct WorkItem { virtual void execute () = 0; };

BasicWorkList () {
complete_ = CreateEvent(NULL, FALSE, FALSE, NULL);
worksem_ = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL);
}

~BasicWorkList () {
CloseHandle(worksem_);
CloseHandle(complete_);
}

void addItem (WorkItem *item) {
items_.push_back(item);
}

void beginWork () {
assert(!items_.empty());
index_ = -1;
ReleaseSemaphore(worksem_, (LONG)items_.size(), NULL);
}

void waitComplete () {
WaitForSingleObject(complete_, INFINITE);
}

void executeNext () {
WaitForSingleObject(worksem_, INFINITE);
LONG index = InterlockedIncrement(&index_);
items_[index]->execute();
if (index == (LONG)(items_.size() - 1)) { // last item
items_.clear();
SetEvent(complete_);
}
}

private:

std::vector<WorkItem *> items_;
volatile LONG index_;
HANDLE worksem_;
HANDLE complete_;

};

=== END CODE ===

Where the producer thread might be something like this (you get the
idea):

void producerthread () {
while (true) {
worklist.addItem(work1); // queue items
worklist.addItem(work2);
worklist.addItem(work3);
worklist.beginWork(); // go
worklist.waitComplete();
// process results
}
}

And the worker threads are basically:

void workerthread () {
while (true) {
worklist.executeNext();
}
}


The bug that I'm running into is that the completion event is signaled
when the last executed work item completes, but this doesn't
necessarily mean that all work items have completed. For example, if
the second to last work item executed takes longer to complete and
finishes after the last item, waitComplete() returns before that long
work item is finished.

Is there some way I can make this work? I want to wait for all threads
to finish executing, except I'm having trouble making it work because
a thread that doesn't dequeue the last item doesn't know that there
are no more items remaining when it's finished... it just blocks on
the semaphore. So I can't have, say, an event for each worker thread.

I've done some benchmarks and performance seems to be good, with very
little overhead per work item (on my machine, on the order of 20 usec
per work item compared to 5 usec when just executing work items
directly from the producer thread, where the work items themselves are
on the order of 5000-6000 usec). So I'd like to use this
implementation, except it has that bug.

Thanks!
Jason

JC

unread,
Jan 16, 2009, 6:05:42 PM1/16/09
to
On Jan 16, 2:48 pm, JC <jason.cipri...@gmail.com> wrote:
> The bug that I'm running into is that the completion event is signaled
> when the last executed work item completes, but this doesn't
> necessarily mean that all work items have completed. For example, if
> the second to last work item executed takes longer to complete and
> finishes after the last item, waitComplete() returns before that long
> work item is finished.
>
> Is there some way I can make this work? I want to wait for all threads
> to finish executing, except I'm having trouble making it work because
> a thread that doesn't dequeue the last item doesn't know that there
> are no more items remaining when it's finished... it just blocks on
> the semaphore. So I can't have, say, an event for each worker thread.

I think I figured it out; I added a count of unfinished items, and the
last thread to complete an item clears the list and sets the event:

=== BEGIN CODE ===

#include <windows.h>
#include <vector>
#include <cassert>

using namespace std;


class BasicWorkList {

public:

struct WorkItem { virtual void execute () = 0; };

BasicWorkList () {
complete_ = CreateEvent(NULL, FALSE, FALSE, NULL);
worksem_ = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL);
}

~BasicWorkList () {
CloseHandle(worksem_);
CloseHandle(complete_);
}

void addItem (WorkItem *item) {
items_.push_back(item);
}

void beginWork () {
assert(!items_.empty());
index_ = -1;

unfinished_ = (LONG)items_.size();
ReleaseSemaphore(worksem_, (LONG)items_.size(), NULL);
}

void waitComplete () {
WaitForSingleObject(complete_, INFINITE);
}

void executeNext () {
WaitForSingleObject(worksem_, INFINITE);
LONG index = InterlockedIncrement(&index_);
items_[index]->execute();

if (!InterlockedDecrement(&unfinished_)) { // last item complete?
assert(index == (items_.size() - 1)); // work list is empty.
items_.clear();
SetEvent(complete_);
}
}

private:

std::vector<WorkItem *> items_;
volatile LONG index_;

volatile LONG unfinished_;
HANDLE worksem_;
HANDLE complete_;

};

=== END CODE ===

This seems to be working well except it more than doubled the overhead
to 45 usec on average compared to about 20 usec before (compared to 5
usec for just calling execute() directly). I guess that's negligible
compared to the length of the work items but it would be nice to speed
it up. There are 2 interlocked increments and a semaphore wait per
work item; I can't think of a way to do it with less.

Jason

David Barrett-Lennard

unread,
Jan 16, 2009, 10:18:16 PM1/16/09
to

I think you need to remove that assertion in executeNext().


> This seems to be working well except it more than doubled the overhead
> to 45 usec on average compared to about 20 usec before

A per-task overhead of 45 usec seems high to me.


> (compared to 5
> usec for just calling execute() directly).

I don't understand that. On a modern machine a function call has an
overhead that's closer to 5nsec than 5usec.


> I guess that's negligible
> compared to the length of the work items but it would be nice to speed
> it up. There are 2 interlocked increments and a semaphore wait per
> work item; I can't think of a way to do it with less.

The thread pool I implemented uses a mutex (aka windows critical
section) to protect the queue of pending tasks, and my worker threads
use an inner loop where they pop items and execute them until the
queue is empty. Only then do they break out of the inner loop and go
back to sleep by waiting on an event in the outer loop.

I avoid waking up all threads when the queue transitions to non-
empty. I also use an optimisation where addItem sometimes gives the
task directly to a sleeping worker thread rather than via the queue.

As you have done, I provide a facility to wait until tasks have
completed. However I group tasks according to the caller to allow
multiple clients to concurrently post tasks then wait (only) on the
execution of the tasks that they posted. For example, one client may
be posting tasks while another client is waiting on the tasks that it
posted.

I think there is an implicit difficulty with measuring a per-task
overhead. When the execute method does nothing at all, a given worker
thread tends to rapidly execute the items until the queue is empty and
goes back to sleep. Therefore the test has a tendency to wake up
sleeping threads all the time, increasing the overheads!

In any case the measured overhead is about 3 usec on a dual Athlon
4600+ machine. I haven't compared performance to any other thread
pool implementations, so I have no idea whether that's good or bad.

JC

unread,
Jan 17, 2009, 1:53:44 AM1/17/09
to
On Jan 16, 10:18 pm, David Barrett-Lennard <davi...@iinet.net.au>
wrote:

> On Jan 17, 8:05 am, JC <jason.cipri...@gmail.com> wrote:
> I think you need to remove that assertion in executeNext().

It's a macro that was removed in the release build that I tested with;
it doesn't affect the timing (I did remove it to double-check).

> > This seems to be working well except it more than doubled the overhead
> > to 45 usec on average compared to about 20 usec before
>
> A per-task overhead of 45 usec seems high to me.
>
> > (compared to 5
> > usec for just calling execute() directly).
>
> I don't understand that. On a modern machine a function call has an
> overhead that's closer to 5nsec than 5usec.

Let me double check the math here... all right, my benchmarks were
flawed because the sample set size was too low (they may be flawed for
other reasons) and because QueryPerformanceCounter() has a good bit of
overhead, I've redone the test program using a much higher sample size
(1000000 instead of 200) and using rdtsc (following guidelines here
http://cs.smu.ca/~jamuir/rdtscpm1.pdf with some modifications).

All right, now that the benchmarks are arguably less crappy, I am
seeing around 33nsec for the plain function call (actually, it's a
call to a virtual function via a pointer to an object, but just a
regular function showed about the same results). And I'm consistently
seeing about 15usec for the queued item.

What I said about the extra InterlockedDecrement adding extra overhead
above was wrong, with or without it, it's still around 15usec. I'm
including the call to addItem() as part of the overhead, and this is
the worst case, where there's only a single item in the queue.

All of my timings may be off by the same scale, because I'm grabbing
the CPU frequency from the Windows registry which can't necessarily be
trusted (using Intel 2.16GHz T2600 Core Duo, reported frequency is
2,161,000,000).

> > I guess that's negligible
> > compared to the length of the work items but it would be nice to speed
> > it up. There are 2 interlocked increments and a semaphore wait per
> > work item; I can't think of a way to do it with less.
>
> The thread pool I implemented uses a mutex (aka windows critical
> section) to protect the queue of pending tasks, and my worker threads
> use an inner loop where they pop items and execute them until the
> queue is empty. Only then do they break out of the inner loop and go
> back to sleep by waiting on an event in the outer loop.
>
> I avoid waking up all threads when the queue transitions to non-
> empty. I also use an optimisation where addItem sometimes gives the
> task directly to a sleeping worker thread rather than via the queue.
>
> As you have done, I provide a facility to wait until tasks have
> completed. However I group tasks according to the caller to allow
> multiple clients to concurrently post tasks then wait (only) on the
> execution of the tasks that they posted. For example, one client may
> be posting tasks while another client is waiting on the tasks that it
> posted.

This sounds like a good implementation; but our use cases are a little
different. I don't need the grouping, and in fact for the most part I
know ahead of time exactly how many items will be in the work queue
each time this runs, so I can probably get away with just resetting
the indexes instead of clearing the queue and re-adding the items now
that I think about it. I also know that the queue won't be growing
while work items are being processed -- I can take advantage of that
by not locking the queue, but it also means added complexity if I try
to make addItem give tasks directly to worker threads (I'm assuming...
but I don't really know that optimization).

> I think there is an implicit difficulty with measuring a per-task
> overhead. When the execute method does nothing at all, a given worker
> thread tends to rapidly execute the items until the queue is empty and
> goes back to sleep. Therefore the test has a tendency to wake up
> sleeping threads all the time, increasing the overheads!

That's a good point. I tried to measure the worst case, though, with
one worker thread and one item in the queue. So the sleeping thread is
always woken up, and that's included in the measurement as well.

> In any case the measured overhead is about 3 usec on a dual Athlon
> 4600+ machine. I haven't compared performance to any other thread
> pool implementations, so I have no idea whether that's good or bad.

Does that include the time it takes to add the work item to the queue?
I don't know if it's good or bad either but it's certainly a lot
better than mine, the CPU is a bit better than mine as well. After
fixing my benchmarks, I'm seeing 15usec for queue + wakeup + execute +
wait on completion. My timing might include more than yours, but it
still makes me wonder if I'm taking the wrong approach.

I'd be happy to post the test program, although it's a bit long and
messy. The basic timing is done like this (pseudo-ish code):

struct Timing : public WorkItem {
__int64 start;
__int64 fstart;
__int64 fdone;
__int64 done;
void execute () { // called by worker thread
fstart = rdtsc; // <-- processing start time
// <-- work load
fdone = rdtsc; // <-- processing end time
}
}

for (n = 0; n < 1000000; ++n) {
Timing t;
start = rdtsc; // <-- start time
work.addItem(&t);
work.beginWork();
work.waitComplete();
done = rdtsc; // <-- end time
timings.push_back(t);
}

I've tried a few different things for "work load" all producing
identical results; ranging from nothing at all, to 1000 calls to sin()
on a random number (what I'm currently using, verified that calls were
not optimized away), to Sleep() calls.

I define "overhead" as the average ((done - start) - (fdone - fstart))
over 1,000,000 iterations. The rdtsc's are preceded with cpuid to
prevent instruction reordering, and the overhead of the cpuid + rdtsc
calls is measured when the program starts (I've verified that the
measurements, in cycle counts, are repeatable with no variation). The
major unknown is CPU cycles per second, but the relative times are
still valid.


Jason

David Barrett-Lennard

unread,
Jan 17, 2009, 8:59:37 PM1/17/09
to
On Jan 17, 3:53 pm, JC <jason.cipri...@gmail.com> wrote:
> On Jan 16, 10:18 pm, David Barrett-Lennard <davi...@iinet.net.au>
> wrote:
>
> > On Jan 17, 8:05 am, JC <jason.cipri...@gmail.com> wrote:
> > I think you need to remove that assertion in executeNext().
>
> It's a macro that was removed in the release build that I tested with;
> it doesn't affect the timing (I did remove it to double-check).

I think the assertion is *incorrect*.

> > > This seems to be working well except it more than doubled the overhead
> > > to 45 usec on average compared to about 20 usec before
>
> > A per-task overhead of 45 usec seems high to me.
>
> > > (compared to 5
> > > usec for just calling execute() directly).
>
> > I don't understand that. On a modern machine a function call has an
> > overhead that's closer to 5nsec than 5usec.
>
> Let me double check the math here... all right, my benchmarks were
> flawed because the sample set size was too low (they may be flawed for
> other reasons) and because QueryPerformanceCounter() has a good bit of
> overhead, I've redone the test program using a much higher sample size

> (1000000 instead of 200) and using rdtsc (following guidelines herehttp://cs.smu.ca/~jamuir/rdtscpm1.pdfwith some modifications).

IMO a more general purpose thread pool is a good idea because it
easily supports your particular use case, but also can be applied to
many other problems as well.

I achieve grouping capability using a "task executer" (which is
distinct from the thread pool) as follows:

struct Task
{
virtual void Execute() = 0;
};

struct TaskExecuter
{
virtual void Close() = 0;
virtual void Post(Task* task) = 0;
virtual void Wait() = 0;
};

struct ThreadPool
{
virtual void Close() = 0;
virtual TaskExecuter* CreateTaskExecuter() = 0;
};

ThreadPool* CreateThreadPool();

I only measured overhead as done-start using an empty execute method.

JC

unread,
Jan 18, 2009, 6:57:47 PM1/18/09
to
On Jan 18, 1:59 am, David Barrett-Lennard <davi...@iinet.net.au>

wrote:
> On Jan 17, 3:53 pm, JC <jason.cipri...@gmail.com> wrote:
>
> > On Jan 16, 10:18 pm, David Barrett-Lennard <davi...@iinet.net.au>
> > wrote:
>
> > > On Jan 17, 8:05 am, JC <jason.cipri...@gmail.com> wrote:
> > > I think you need to remove that assertion in executeNext().
>
> > It's a macro that was removed in the release build that I tested with;
> > it doesn't affect the timing (I did remove it to double-check).
>
> I think the assertion is *incorrect*.

You are right; it is. It was correct in the first revision.

In fact, I just spent about 2 hours debugging a mysterious and
frustrating crash somewhere in that function (confusing the heck out
of VS's debugger), and it turned out to be that very assertion
failing. I had ignored the assert() as irrelevant the entire time. So
feel free to pitch in a big fat "I told you so", heh. I just now
solved the problem then came back here to check this thread; too bad I
didn't read it earlier!


Thanks,
Jason

David Barrett-Lennard

unread,
Jan 18, 2009, 8:17:27 PM1/18/09
to

I find VS sometimes gets confused with assertions because it displays
a dialog which pumps the message queue. I find it better to simply
make the assertion break immediately in the debugger without
displaying a dialog. This can be achieved by using an assertion macro
that doesn't make the call to _CrtDbgReport before calling
_CrtDbgBreak (the latter simply does an int 3).

Chris M. Thomasson

unread,
Jan 18, 2009, 11:54:05 PM1/18/09
to
"JC" <jason.c...@gmail.com> wrote in message
news:88488007-8c4a-4959...@j39g2000yqn.googlegroups.com...

>I have a bug in a work queue implementation I'm trying, and I can't
> think of a good solution.
> [...]

Humm... Here is a pseudo-code sketch of how I might implement your queuing
scheme. This should solve your problem of the producer not knowing if
workers are completely finished working, and it might scale better:
___________________________________________________________________
#define CONSUMERS 4


struct work {
virtual void on_work() = 0;
};


struct consumer {
non_thread_safe_queue<work*> work;
event process;
};


struct producer {
atomic_word completed; // = 0
event complete;
};


static consumer g_consumers[CONSUMERS];
static producer g_producer;


void producer_thread() {
unsigned i = 0, w;

for (;;) {
ATOMIC_STORE(&g_producer.completed, CONSUMERS);

// generate some work
g_consumers[(++i) % CONSUMERS].work.push(new work);
g_consumers[(++i) % CONSUMERS].work.push(new work);
g_consumers[(++i) % CONSUMERS].work.push(new work);
g_consumers[(++i) % CONSUMERS].work.push(new work);
g_consumers[(++i) % CONSUMERS].work.push(new work);

for (w = 0; w < CONSUMERS; ++w) {
g_consumers[w].process.signal();
}

g_producer->complete.wait();
}
}


void consumer_threads(size_t id) {
consumer* const self = &g_consumers[id];
work* w;

for (;;) {
self->process.wait();

while ((w = self->work.pop())) {
w->on_work();
}

if (! ATOMIC_DEC(&g_producer.completed)) {
g_producer.complete.signal();
}
}
}
___________________________________________________________________


Each consumer has a private non-thread-safe queue. The queue is owned by the
producer until it signals the consumer to which the queue is an integral
part of. Once the signal is received, the queue is now owned by the
consumer. The consumer transfers the ownership of its queue back to the
producer by decrementing the refcount `producer::completed' counter. When it
drops to zero, the producer resumes it execution.

David Barrett-Lennard

unread,
Jan 19, 2009, 1:27:43 AM1/19/09
to
On Jan 19, 1:54 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:
> "JC" <jason.cipri...@gmail.com> wrote in message

I like the idea of private non-thread-safe queues and imagine it can
offer better performance than a global queue. However in some
applications the consumers may be given very different workloads so
some consumers become idle while there are still pending tasks for
other consumers.

I wonder whether it would be practical to allow a thread that's run
out of tasks to implicitly request them from another thread? I'm
thinking that a thread that has run out of tasks pushes itself onto a
stack of sleeping threads. A thread that's sitting in the while loop
(processing tasks from its private queue) also tests whether the stack
of sleeping threads is non-empty. If so then it pops a sleeping thread
from the stack, transfers half of its remaining tasks to the sleeping
thread's queue, then signals it to awake.

Chris M. Thomasson

unread,
Jan 19, 2009, 4:41:19 PM1/19/09
to
"David Barrett-Lennard" <dav...@iinet.net.au> wrote in message
news:55fbf57c-2e12-4892...@t26g2000prh.googlegroups.com...

> On Jan 19, 1:54 pm, "Chris M. Thomasson" <n...@spam.invalid> wrote:
>> "JC" <jason.cipri...@gmail.com> wrote in message
>>
>> news:88488007-8c4a-4959...@j39g2000yqn.googlegroups.com...
>>
>> >I have a bug in a work queue implementation I'm trying, and I can't
>> > think of a good solution.
>> > [...]
>>
>> Humm... Here is a pseudo-code sketch of how I might implement your
>> queuing
>> scheme. This should solve your problem of the producer not knowing if
>> workers are completely finished working, and it might scale better:
>> ___________________________________________________________________
>> [...]

>> ___________________________________________________________________
>>
>> Each consumer has a private non-thread-safe queue. The queue is owned by
>> the
>> producer until it signals the consumer to which the queue is an integral
>> part of. Once the signal is received, the queue is now owned by the
>> consumer. The consumer transfers the ownership of its queue back to the
>> producer by decrementing the refcount `producer::completed' counter. When
>> it
>> drops to zero, the producer resumes it execution.
>
> I like the idea of private non-thread-safe queues and imagine it can
> offer better performance than a global queue. However in some
> applications the consumers may be given very different workloads so
> some consumers become idle while there are still pending tasks for
> other consumers.

Indeed.


> I wonder whether it would be practical to allow a thread that's run
> out of tasks to implicitly request them from another thread?

Here is an brief outline of a very crude algorihtm I was tinkering around
with:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/6019e377e4aa73ec

It uses work requesting idea instead of work stealing. There are some issues
that need to be resolved, but I think it can be made to work.


> I'm
> thinking that a thread that has run out of tasks pushes itself onto a
> stack of sleeping threads. A thread that's sitting in the while loop
> (processing tasks from its private queue) also tests whether the stack
> of sleeping threads is non-empty. If so then it pops a sleeping thread
> from the stack, transfers half of its remaining tasks to the sleeping
> thread's queue, then signals it to awake.

That should work fine. You could still keep the over all design of non
thread safe queue per-thread. Humm, here is a crude sketch of code:
_____________________________________________________________________
#define CONSUMERS 4


struct work {
virtual void on_work() = 0;
};


struct consumer {
consumer* next;


non_thread_safe_queue<work*> work;
event process;
};


struct producer {
atomic_word completed; // = 0
event complete;
};


static consumer g_consumers[CONSUMERS];
static producer g_producer;

static intrusive_atomic_stack<consumer*> g_waiters;


void producer_thread() {
for (;;) {
consumer* c = g_waiters.flush();
if (c) {
size_t n = 0;
consumer* w;

w = c;

// generate work
while (w) {
++n;
w->work.push(new work);
w = w->next;
}

ATOMIC_STORE(&g_producer.completed, n);

w = c;

// signal work
while (w) {
w->work.process.signal();
w = w->next;
}

g_producer.complete.wait();

} else {
// backoff; do something else...
}
}
}


void consumer_threads(size_t id) {
consumer* const self = &g_consumers[id];
work* w;

for (;;) {
g_waiters.push(self);
self->process.wait();

while ((w = self->work.pop())) {
w->on_work();

if (! g_waiters.empty() && ! self->work.empty()) {
consumer* other = g_waiters.pop();
if (other) {
ATOMIC_INC(&g_producer.completed);
other->work.transfer_half(self->work);
other->process.signal();
}
}
}

if (! ATOMIC_DEC(&g_producer.completed)) {
g_producer.complete.signal();
}
}
}
_____________________________________________________________________


That should distribute the work fairly well. Humm... Any thoughts?

Chris M. Thomasson

unread,
Jan 20, 2009, 1:36:25 AM1/20/09
to
"Chris M. Thomasson" <n...@spam.invalid> wrote in message
news:Jr6dl.42389$g23....@newsfe01.iad...
[...]

>
> That should distribute the work fairly well. Humm... Any thoughts?

Still, it cannot be as efficient as work stealing per-thread deque because
the stealing is performed by cpus with no work. Heck, even a 100% lock-based
per-thread deque is super simple and scales pretty darn good. There a
numerous way to optimize a lock-based work stealing deque. Humm... I know
that asymmetric algorihtms can come into play. The "dedicated" thread can be
the CPU which owns the deque and the foreign consumers will always be CPUS
with "nothing" to do anyway, so they don't mind a little slow-path action;
whatever...


http://groups.google.com/group/comp.programming.threads/browse_frm/thread/22b2736484af3ca6
(read all!)


If local access of the deque was analogous to read access to the asymmetric
read access, well, why not make the slow-path "Stealing of course!" the
writer... It should make a perfectly fine work stealing deque. The
collection which makes up the deque can be intrusive and require no
non-blocking memory management scheme. The deque can be 100% linked pointer
based unbounded data-structure. humm... Need to think.

Chris M. Thomasson

unread,
Jan 20, 2009, 1:39:59 AM1/20/09
to

"Chris M. Thomasson" <n...@spam.invalid> wrote in message
news:Zhedl.115123$ln7.1...@newsfe04.iad...


This might work:


#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>


struct node {
struct node* next;
struct node* prev;
};


struct wsdeque {
struct node* volatile head;
struct node* volatile tail;
unsigned long int volatile lcount;
unsigned long int volatile rcount;
pthread_mutex_t mutex;
};


#define WSDEQUE_INITIALIZER { \
NULL, \
NULL, \
0, \
0, \
PTHREAD_MUTEX_INITIALIZER \
}


static struct node*
wsdeque_push(
struct wsdeque* const self,
struct node* node
) {
struct node* const head = self->head;

node->prev = NULL;

node->next = head;

if (! head) {
self->tail = node;
} else {
head->prev = node;
}

self->head = node;

/* #StoreStore */

++self->lcount;

return node;
}


static struct node*
wsdeque_pop(
struct wsdeque* const self
) {
struct node* head;

wsdeque_pop_retry:
head = self->head;

if (head) {
int unlock = 0;

if (self->lcount - self->rcount == 2) {
pthread_mutex_lock(&self->mutex);

if (self->lcount - self->rcount != 2 ||
self->head != head) {
pthread_mutex_unlock(&self->mutex);
goto wsdeque_pop_retry;
}

unlock = 1;
}


if (! (self->head = head->next)) {
self->tail = NULL;
} else {
self->head->prev = NULL;
}

--self->lcount;

if (unlock) {
pthread_mutex_unlock(&self->mutex);
}
}

return head;
}


static struct node*
wsdeque_steal(
struct wsdeque* const self
) {
struct node* tail;

wsdeque_steal_retry:
tail = self->tail;

if (tail) {
if (self->lcount - self->rcount < 2) {
return NULL;
}

pthread_mutex_lock(&self->mutex);

if (self->lcount - self->rcount < 2 ||
self->tail != tail) {
pthread_mutex_unlock(&self->mutex);
goto wsdeque_steal_retry;
}

tail->prev->next = NULL;
self->tail = tail->prev;

++self->rcount;

pthread_mutex_unlock(&self->mutex);
}

return tail;
}


int main(void) {
size_t i;
struct node* n;
static struct wsdeque wsdq = WSDEQUE_INITIALIZER;

for (i = 0; i < 10; ++i) {
n = malloc(sizeof(*n));
printf("wsdeque_push %p\n", (void*)n);
wsdeque_push(&wsdq, n);
}

while ((n = wsdeque_steal(&wsdq))) {
printf("wsdeque_steal %p\n", (void*)n);
free(n);
}

while ((n = wsdeque_pop(&wsdq))) {
printf("wsdeque_pop %p\n", (void*)n);
free(n);
}


/*------------------------------------------------------------*/
puts("\n\n\n_________________________________________________"
"______\npress <ENTER> to exit...");
fflush(stdin);
fflush(stdout);
getchar();

return 0;
}


Not sure yet.

Dmitriy V'jukov

unread,
Jan 20, 2009, 2:40:53 AM1/20/09
to
On 20 янв, 09:39, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> static struct node*
> wsdeque_pop(
>  struct wsdeque* const self
> ) {
>   struct node* head;
>
> wsdeque_pop_retry:
>   head = self->head;
>
>   if (head) {
>     int unlock = 0;
>
>     if (self->lcount - self->rcount == 2) {

/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\

maybe if (self->lcount - self->rcount <= 2) ?

>       pthread_mutex_lock(&self->mutex);
>
>       if (self->lcount - self->rcount != 2 ||
>           self->head != head) {
>         pthread_mutex_unlock(&self->mutex);
>         goto wsdeque_pop_retry;
>       }
>
>       unlock = 1;
>     }
>
>     if (! (self->head = head->next)) {
>       self->tail = NULL;
>     } else {
>       self->head->prev = NULL;
>     }
>
>     --self->lcount;
>
>     if (unlock) {
>       pthread_mutex_unlock(&self->mutex);
>     }
>   }
>
>   return head;
>
> }


There still must be something like store -> #StoreLoad membar -> load -
> conditional branch in the pop() and steal(), no? Or you are counting
that consumption of the last but one work item acts like StoreLoad
membar?
Otherwise I think owner can take fast-path in pop() when consuming
last work item and thus conflict with stealer...

--
Dmitriy V'jukov

Dmitriy V'jukov

unread,
Jan 20, 2009, 12:38:13 PM1/20/09
to
On Jan 20, 9:36 am, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> Still, it cannot be as efficient as work stealing per-thread deque because
> the stealing is performed by cpus with no work. Heck, even a 100% lock-based
> per-thread deque is super simple and scales pretty darn good. There a
> numerous way to optimize a lock-based work stealing deque. Humm... I know
> that asymmetric algorihtms can come into play. The "dedicated" thread can be
> the CPU which owns the deque and the foreign consumers will always be CPUS
> with "nothing" to do anyway, so they don't mind a little slow-path action;
> whatever...
>

> http://groups.google.com/group/comp.programming.threads/browse_frm/th...


> (read all!)
>
> If local access of the deque was analogous to read access to the asymmetric
> read access, well, why not make the slow-path "Stealing of course!" the
> writer... It should make a perfectly fine work stealing deque. The
> collection which makes up the deque can be intrusive and require no
> non-blocking memory management scheme. The deque can be 100% linked pointer
> based unbounded data-structure. humm... Need to think.


What do you think about following approach?
Deque is divided into 2 regions: "normal" and "reserved" (for owner).
While in normal region stealer can steal with normal symmetric
protocol (both owner and stealer execute store-load fence, for
example). But while in reserved region stealer uses asymmetric
protocol (i.e. owner doesn't execute store-load fence). On every
access owner optionally adjusts boundary between normal and reserved
regions, trying to keep stealers in normal region. Policy for boundary
might use heuristics, something like:
if deque_size <= 8, then boundary_low = boundary_high = deque_size
(whole deque is normal), check_boundary_on_nth_call = 4 (check
boundary every 4 accesses to deque)
else if deque_size <= number_of_threads*8, then boundary_low =
deque_size/4, boundary_high = deque_size*3/4,
check_boundary_on_nth_call = deque_size/8
else if deque_size > number_of_threads*8, then boundary_low = 64,
boundary_high = 128, check_boundary_on_nth_call = deque_size/16
I.e. owner tries to keep boundary between low watermark and high
watermark, and checks all these sizes not on every call but only on
every check_boundary_on_nth_call call.
The idea behind is that stealers will be in normal region in common
case, so stealers can use lightweight nonblocking (only 1 store-load
membar) protocol for stealing, and at the same time owner can use no
synchronization at all (because working mainly in reserved region).
When I was playing with similar deque (w/o asymmetric protocol
actually), I was able to achieve some 70 cycles per complete send/
receive cycle (including memory management for tasks, etc). However
taking into account the fact that mfence is significantly cheaper in
latest Intel processors (i7), probably it doesn't worth the complexity
now... And probably the most simple and straightforward approach with
single mfence per push()/pop() can actually be faster...


--
Dmitriy V'jukov

Chris M. Thomasson

unread,
Jan 21, 2009, 3:29:58 PM1/21/09
to
"Dmitriy V'jukov" <dvy...@gmail.com> wrote in message
news:86fbc2bb-a1c9-4a21...@k9g2000vbl.googlegroups.com...

On 20 янв, 09:39, "Chris M. Thomasson" <n...@spam.invalid> wrote:

> [...]

> There still must be something like store -> #StoreLoad membar -> load -
> conditional branch in the pop() and steal(), no? Or you are counting
> that consumption of the last but one work item acts like StoreLoad
> membar?
> Otherwise I think owner can take fast-path in pop() when consuming
> last work item and thus conflict with stealer...

I need to make some bug fixes, however, I am trying to use the count to
separate head and tail such that local cpu can pop and steal will only occur
if its N nodes away from local cpu head. I try to separate nodes by 2. So,
stealer cannot steal anything if local cu has one work item. If it has two
or more, well, a foreign cpu can try and steal. I have no time right now to
elaborate. sorry about that non-sense.

0 new messages