Looking for advice on lock-free workqueue

125 views
Skip to first unread message

Miral Mirality

unread,
Dec 5, 2013, 3:30:46 AM12/5/13
to lock...@googlegroups.com
I've recently been experimenting with Boost.Asio and its thread-reactor model (where a pool of worker threads is created and jobs can be posted to the pool and then executed on any of the workers).  I've built a prototype alternate implementation that requires no locking (other than what the OS does itself), and it seems to be working fairly well.
 
But there are some cases where work that gets submitted from multiple threads must not be allowed to execute concurrently.  Asio's solution for this is strands, which are basically a kind of lock internally.  I'm trying to implement something similar in as close to lock-free fashion as possible.
 
My latest attempt is along these lines:
 
ThreadPool& pool;
dmitry_mpsc_q<WorkItem> q;
atomic<bool> wip(false);
 
public:
// can be called concurrently from any thread
void post(const WorkItem& item)
{
    q.enqueue(item);
    if (!wip.exchange(true, memory_order_seq_cst))
    {
        pool.post(bind(&self::do_work, this));  // executes asynchronously on a worker thread
    }
}
 
private:
// can be run on any worker thread in the pool, but only one at a time
void do_work()
{
    WorkItem item;
    while (q.dequeue(item))
    {
        process(item);
    }
    wip.store(false, memory_order_seq_cst);
}
 
The goal is to guarantee that do_work() will be executed after any item is posted, but only on one thread of the pool.  If further items are queued they will either get picked up by the next cycle of the existing do_work() or will kick off their own worker if the previous do_work() has fallen out of its loop.  Obviously the above code doesn't work, but it fails in a way different than what I was expecting -- do_work() can see an empty queue, despite only being started after an item is queued.  (I was expecting it to later deadlock with items in the queue but no worker running, due to a race on wip.)
 
The queue involved is Dmitry's unbounded MPSC queue; I've tested my implementation of it with RRD and I'm fairly happy that this is safe.  I've also tried a couple of different MPMC queue implementations and they produce similar results.
 
I have made a version that does work (also tested with RRD) but it works by using an atomic<int> for wip, incrementing after each enqueue (and posting do_work if the previous value was 0), and decrementing on each successful dequeue.  The part that "fixes" the above issue is that it also loops and retries (with a Sleep) if the queue appears to be empty but wip is > 0.  (And I'm not sure how that's even possible, given that wip is incremented after the enqueue, and they're both using atomic vars -- but it has been observed in the RRD test.  Clearly I must be misunderstanding something about atomics.)
 
I don't really like the extra loop-and-sleep behaviour though and I was hoping to eliminate it with something closer to the code above.  Any tips, or am I just dreaming (aka, I should just use a mutex)?
Reply all
Reply to author
Forward
0 new messages