FIFO Distributed Bakery Algorithm...

1,403 views
Skip to first unread message

Chris M. Thomasson

unread,
Oct 19, 2013, 5:43:05 PM10/19/13
to lock...@googlegroups.com
AFAICT, this is a pretty neat tweak to the following original 
awesome bakery algorithm:


Here is the tweak in the form of an algorithm that does not 
rely on CAS:


<pseudo code, membars aside for a moment>
______________________________________________
struct cell { uint32_t ver; double state; };

uint32_t head = 0;
uint32_t tail = 0;
cell cells[N]; // N must be a power of 2

void init() {
    for (uint32_t i = 0; i < N; ++i) cells[i].ver = i;
}

void producer(double state) {
    uint32_t ver = XADD(&head, 1);
    cell& c = cells[ver & (N - 1)];
    while (LOAD(&c.ver) != ver) backoff();
    c.state = state;
    STORE(&c.ver, ver + 1);
}

double consumer() {
    uint32_t ver = XADD(&tail, 1);
    cell& c = cells[ver & (N - 1)];
    while (LOAD(&c.ver) != ver + 1) backoff();
    double state = c.state;
    STORE(&c.ver, ver + N);
    return state;
}
______________________________________________




this works in Relacy, and can be mixed with a clever
distributed waitset to get around the spinwait on
the ticket...


If you are interested, I can post some more detailed 
information, and code.

Dmitriy V'jukov

unread,
Oct 27, 2013, 1:39:06 PM10/27/13
to lock...@googlegroups.com
Hi Chris!

This is an interesting modification to the algorithm.

I guess by the distributed waitset you mean using the cell.state
itself as wait event. It should work nice if coupled with a per cell
futex. However note that waiters can wrap around the array, it is
still distributed in this case, but is tricker to implement correctly.

Reflecting on reasons why originally I decided to use CAS on call.state.
Your XADD modification imposes strict FIFO order on waiters. This
means that a new coming consumer necessary blocks even if there are
available elements in the queue (but the elements are "reserved" for
older consumers).
There is similar design option in mutexes, think of FIFO ticket mutex.
But this is usually considered bad for performance. Usually you want
new coming (and still runnable) consumers to be able to consume
available elements (and continue running) even if there are older
blocked consumers. In my CAS-based algorithm this is the case.

I think it must be possible to combine e.g. CAS-based consumers and
XADD-based producers. This can be beneficial if you expect that
producers usually do not blocks (so you care more about fast-path
performance rather than blocking behavior).

Cheers!

Chris M. Thomasson

unread,
Oct 31, 2013, 7:40:38 PM10/31/13
to lock...@googlegroups.com
> On Sunday, October 27, 2013 10:39:06 AM UTC-7, Dmitry Vyukov wrote:
> On Sun, Oct 20, 2013 at 1:43 AM, Chris M. Thomasson <cri...@charter.net> wrote: 
> > AFAICT, this is a pretty neat tweak to the following original 
> > awesome bakery algorithm: 
> > 
> > 
> > Here is the tweak in the form of an algorithm that does not 
> rely on CAS: 
> [snip tweak]
> > this works in Relacy, and can be mixed with a clever 
> > distributed waitset to get around the spinwait on 
> > the ticket... 
> > 
> > 
> > If you are interested, I can post some more detailed 
> > information, and code. 


> Hi Chris! 

> This is an interesting modification to the algorithm. 

Thank you Dmitry. It's nice to hear from you!

:^)



> I guess by the distributed waitset you mean using the cell.state 
> itself as wait event. It should work nice if coupled with a per cell 
> futex. However note that waiters can wrap around the array, it is 
> still distributed in this case, but is tricker to implement correctly. 

I created a table of waitsets. A thread that needs to wait,
obtains a reference to the correct waitset by hashing the 
value it obtained from the XADD.



> Reflecting on reasons why originally I decided to use CAS on call.state. 
> Your XADD modification imposes strict FIFO order on waiters. 

Strict order indeed!  ;^)



> This 
> means that a new coming consumer necessary blocks even if there are 
> available elements in the queue (but the elements are "reserved" for 
> older consumers). 

> There is similar design option in mutexes, think of FIFO ticket mutex. 

Exactly right.



> But this is usually considered bad for performance. Usually you want 
> new coming (and still runnable) consumers to be able to consume 
> available elements (and continue running) even if there are older 
> blocked consumers. In my CAS-based algorithm this is the case. 

Yes it is. 



> I think it must be possible to combine e.g. CAS-based consumers and 
> XADD-based producers. This can be beneficial if you expect that 
> producers usually do not blocks (so you care more about fast-path 
> performance rather than blocking behavior). 

AFAICT, my tweak should be 100% compatible with the 
existing algorithm as-is. IMVHO, it could be a fairly
beneficial addition to the existing API. It opens
up a new way to think about handling contention wrt
using the queue as a whole.

Here is a test of the tweak that should go ahead and
compile with Relacy:
___________________________________________________________________
/*
Bounded Multi-Producer/Multi-Consumer FIFO Queue

Distributed Waitset Algorithm

By Christopher Michael Thomasson, 


Based Off Of, Dmitry Vyukov's Excellent Algorithm:



This program is free software: you can redistribute it and/or modify 
it under the terms of the GNU General Public License as published by 
the Free Software Foundation, either version 3 of the License, or 
(at your option) any later version. 


This program is distributed in the hope that it will be useful, 
but WITHOUT ANY WARRANTY; without even the implied warranty of 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
GNU General Public License for more details. 


You should have received a copy of the GNU General Public License 
along with this program.  If not, see <http://www.gnu.org/licenses/>. 
_______________________________________________________________________*/




//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <cstdio>


#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst


#define mb_relaxed_fence() std::atomic_thread_fence(mb_relaxed)
#define mb_consume_fence() std::atomic_thread_fence(mb_consume)
#define mb_acquire_fence() std::atomic_thread_fence(mb_acquire)
#define mb_release_fence() std::atomic_thread_fence(mb_release)
#define mb_acq_rel_fence() std::atomic_thread_fence(mb_acq_rel)
#define mb_seq_cst_fence() std::atomic_thread_fence(mb_seq_cst)




class waitset
{
    std::mutex m_mutex;
    std::condition_variable m_cond;
    std::atomic<bool> m_waitbit;
    VAR_T(unsigned) m_waiters;
 
 
 
public:
    waitset()
    :   m_waitbit(false),
        m_waiters(0)
    {
       
    }
 
 
    ~waitset()
    {
        bool waitbit = m_waitbit.load(mb_relaxed);
 
        unsigned waiters = VAR(m_waiters);
 
        RL_ASSERT(! waitbit && ! waiters);
    }
 
 
 
private:
    void prv_signal(bool waitbit, bool broadcast)
    {
        if (! waitbit) return;
 
        m_mutex.lock($);
 
        unsigned waiters = VAR(m_waiters);
 
        if (waiters < 2 || broadcast)
        {
            m_waitbit.store(false, mb_relaxed);
        }
 
        m_mutex.unlock($);
 
        if (waiters)
        {
            if (! broadcast)
            {
                m_cond.notify_one($);
            }
 
            else
            {
                m_cond.notify_all($);
            }
        }
    }
 
 
 
public:
    unsigned wait_begin()
    {
        m_mutex.lock($);
 
        m_waitbit.store(true, mb_relaxed);
 
        mb_seq_cst_fence();

        return 0;
    }
 
 
    bool wait_try_begin(unsigned& key)
    {
        if (! m_mutex.try_lock($)) return false;
 
        m_waitbit.store(true, mb_relaxed);
 
        mb_seq_cst_fence();
 
        return true;
    }
 
 
    void wait_cancel(unsigned key)
    {
        unsigned waiters = VAR(m_waiters);
 
        if (! waiters)
        {
            m_waitbit.store(false, mb_relaxed);
        }
 
        m_mutex.unlock($);
    }
 
 
    void wait_commit(unsigned key)
    {
        ++VAR(m_waiters);
 
        m_cond.wait(m_mutex, $);
 
        if (! --VAR(m_waiters))
        {
            m_waitbit.store(false, mb_relaxed);
        }
 
        m_mutex.unlock($);
    }
 
 
 
public:
    void signal()
    {
        mb_seq_cst_fence();
 
        bool waitbit = m_waitbit.load(std::memory_order_relaxed);
 
        prv_signal(waitbit, false);
    }
 
 
    void broadcast()
    {
        mb_seq_cst_fence();
 
        bool waitbit = m_waitbit.load(std::memory_order_relaxed);
 
        prv_signal(waitbit, true);
    }
};





// T_depth && T_wdepth MUST be a power of 2!
template<typename T, unsigned T_depth, unsigned T_wdepth> 
struct mpmc_bounded_queue
{
    struct cell_type
    {
        std::atomic<unsigned> m_state;
        VAR_T(T) m_object;
    };


    std::atomic<unsigned> m_head;
    std::atomic<unsigned> m_tail;
    waitset m_waitset[T_wdepth];
    cell_type m_buffer[T_depth];


    mpmc_bounded_queue() : m_head(0), m_tail(0)
    {
        // initialize version numbers.
        for (unsigned i = 0; i < T_depth; ++i)
        {
            m_buffer[i].m_state.store(i, mb_relaxed);
        }
    }


    void push(T const& obj)
    {
        // obtain our head version and cell.
        unsigned idx = m_head.fetch_add(1, mb_relaxed);
        cell_type& cell = m_buffer[idx & (T_depth - 1U)];
        waitset& wset = m_waitset[idx & (T_wdepth - 1U)];

        // wait for it...
        while (cell.m_state.load(mb_relaxed) != idx)
        {
            unsigned key = wset.wait_begin();

            if (cell.m_state.load(mb_relaxed) == idx)
            {
                wset.wait_cancel(key);
                break;
            }

            wset.wait_commit(key);
        }

        mb_acquire_fence();

        // GOT IT! Okay, write to the object.
        VAR(cell.m_object) = obj;

        // We are done; allow a consumer to consume.
        mb_release_fence();
        cell.m_state.store(idx + 1, mb_relaxed);

        wset.broadcast();
    }


    void pop(T& obj)
    {
        // obtain our tail version and cell.
        unsigned idx = m_tail.fetch_add(1, mb_relaxed);
        cell_type& cell = m_buffer[idx & (T_depth - 1U)];
        waitset& wset = m_waitset[idx & (T_wdepth - 1U)];

        // wait for it...
        while (cell.m_state.load(mb_relaxed) != idx + 1U)
        {
            unsigned key = wset.wait_begin();

            if (cell.m_state.load(mb_relaxed) == idx + 1U)
            {
                wset.wait_cancel(key);
                break;
            }

            wset.wait_commit(key);
        }

        mb_acquire_fence();
        
        // GOT IT! Okay, read from the object.
        obj = VAR(cell.m_object);

        // We are done; allow a producer to produce.
        mb_release_fence();
        cell.m_state.store(idx + T_depth, mb_relaxed);

        wset.broadcast();
    }
};




#define PRODUCERS 2
#define CONSUMERS 3
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 7
#define BUFFER_SIZE (8)
#define WAITSET_SIZE (4)

struct mpmc_bounded_queue_test
:   rl::test_suite<mpmc_bounded_queue_test, THREADS>
{
    mpmc_bounded_queue<unsigned, BUFFER_SIZE, WAITSET_SIZE> g_queue;
    unsigned g_test_term_producers; // test termination only!
    unsigned g_test_term_consumers; // test termination only!

 
    void before()
    {
        g_test_term_producers = PRODUCERS;
        g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS;
    }


    void after()
    {
        RL_ASSERT(! g_test_term_consumers && 
                  ! g_test_term_producers);
    }


    void thread_producer(unsigned int tidx)
    {
        for (unsigned i = 0; i < ITERS; ++i)
        {
            g_queue.push(((tidx + 1) << 8U) + i);
        }

        if (! --g_test_term_producers)
        {
            for (unsigned i = 0; i < CONSUMERS; ++i)
            {
                g_queue.push(666666);
            }
        }
    }


    void thread_consumer(unsigned int tidx)
    {
        for (;;)
        {
            unsigned v;
            g_queue.pop(v);
            --g_test_term_consumers;

            if (v == 666666)
            {
                break;
            }
        }
    }


    void thread(unsigned int tidx)
    {   
        if (tidx < PRODUCERS)
        {
            thread_producer(tidx);
        }

        else
        {
            thread_consumer(tidx);
        }
    }
};




int main()
{
    {
        rl::test_params p;

       // p.iteration_count = 1000000;
        //p.execution_depth_limit = 33333;
        //p.search_type = rl::sched_bound;
        //p.search_type = rl::fair_full_search_scheduler_type;
        p.search_type = rl::fair_context_bound_scheduler_type;

        rl::simulate<mpmc_bounded_queue_test>(p);
    }

    std::puts("\nTest Complete!\n");

    std::getchar();

    return 0;
}
___________________________________________________________________



You can find the code here as well:




Well, what do you think of this?



...

BTW, thank you for creating the simply excellent group!

:^)

Dmitriy V'jukov

unread,
Nov 1, 2013, 1:19:13 PM11/1/13
to lock...@googlegroups.com
I see, you use broadcast to resolve this.


>> > Reflecting on reasons why originally I decided to use CAS on call.state.
>> > Your XADD modification imposes strict FIFO order on waiters.
>>
>> Strict order indeed! ;^)
>>
>>
>>
>> > This
>> > means that a new coming consumer necessary blocks even if there are
>> > available elements in the queue (but the elements are "reserved" for
>> > older consumers).
>>
>> > There is similar design option in mutexes, think of FIFO ticket mutex.
>>
>> Exactly right.
>>
>>
>>
>> > But this is usually considered bad for performance. Usually you want
>> > new coming (and still runnable) consumers to be able to consume
>> > available elements (and continue running) even if there are older
>> > blocked consumers. In my CAS-based algorithm this is the case.
>>
>> Yes it is.
>>
>>
>>
>> > I think it must be possible to combine e.g. CAS-based consumers and
>> > XADD-based producers. This can be beneficial if you expect that
>> > producers usually do not blocks (so you care more about fast-path
>> > performance rather than blocking behavior).

>> AFAICT, my tweak should be 100% compatible with the
>> existing algorithm as-is. IMVHO, it could be a fairly
>> beneficial addition to the existing API. It opens
>> up a new way to think about handling contention wrt
>> using the queue as a whole.


Good point.
I guess single XADD for MPMC if far superior than most algorithms out
there, and is basically the best one can get for a centralized queue.
I see people still implement MS node-based queue with PDR, and that's
2 CAS loops + indirections + memory allocations + PDR acquire/release
overheads.... veeeeeery slow :)


Btw, I am thinking about implementing mostly lock-free queues for Go
language (golang.org). And I think it absolutely must be based on this
queue design. The really challenging part is select statements:
http://golang.org/ref/spec#Select_statements
It basically allows you to wait on an arbitrary set of queues, you can
wait for receive, send (or even both), the participating queues can be
asynchronous (buffered) or synchronous (rendezvous between producer
and consumer). There are slightly different semantics for asynchronous
and synchronous queues -- for synchronous queues producer chooses the
consumer to synchronize with and directly hand off the message; for
asynchronous queues producer puts the message into the queue and wakes
one of the consumers, so that a new coming consumer can consume the
message ahead of the blocked one (the same issue that we discussed
above).
And on top of that, some goroutines (threads) can execute selects on
arbitrary overlapping sets of queues, while other goroutines execute
standalone send/recv on some queues.
And at this point this becomes really-really tricky.

Rob Pike has a paper on implementation of select statements for Newsqueak:
ftp://cs.bell-labs.com/cm/cs/who/rsc/thread/newsquimpl.pdf
He said that it was tricky. And that is single-thread implementation.

Current Go implementation is "straightforward" mutex-based. But it's
still quite tricky (search for selgen handling).

I've made several attempts on this (the code is not working and dirty):
https://codereview.appspot.com/4841045/diff/13001/src/pkg/runtime/chan.c
https://codereview.appspot.com/12544043/diff/3001/src/pkg/runtime/chan.c
They all failed. I think I know how to put it all together now, but
just had not have time for that yet.
You are welcome!

Chris M. Thomasson

unread,
Nov 4, 2013, 10:17:41 PM11/4/13
to lock...@googlegroups.com
On Friday, November 1, 2013 10:19:13 AM UTC-7, Dmitry Vyukov wrote:
On Fri, Nov 1, 2013 at 3:40 AM, Chris M. Thomasson <cri...@charter.net> wrote: 
>> > On Sunday, October 27, 2013 10:39:06 AM UTC-7, Dmitry Vyukov wrote: 
>> > On Sun, Oct 20, 2013 at 1:43 AM, Chris M. Thomasson <cri...@charter.net
>> > wrote: 
>> > > AFAICT, this is a pretty neat tweak to the following original 
>> > > awesome bakery algorithm: 
[...]
> >> > I guess by the distributed waitset you mean using the cell.state 
> >> > itself as wait event. It should work nice if coupled with a per cell 
> >> > futex. However note that waiters can wrap around the array, it is 
> >> > still distributed in this case, but is tricker to implement correctly. 
> >> 
> >> I created a table of waitsets. A thread that needs to wait, 
> >> obtains a reference to the correct waitset by hashing the 
> >> value it obtained from the XADD. 

> I see, you use broadcast to resolve this.

;^)


[...]


> >> AFAICT, my tweak should be 100% compatible with the 
> >> existing algorithm as-is. IMVHO, it could be a fairly 
> >> beneficial addition to the existing API. It opens 
> >> up a new way to think about handling contention wrt 
> >> using the queue as a whole. 


> Good point. 
> I guess single XADD for MPMC if far superior than most algorithms out 
> there, and is basically the best one can get for a centralized queue. 

AFAICT, combining the XADD and CMPXCHG versions should give 
the best of both worlds. The thing I like about XADD is the fact that
there is no loop wrt mutating the counter. So far, I have not seen 
another MPMC queue get away with using a single XADD for push 
and pop. Of course this only makes sense when the XADD is
comprised of a single atomic RMW. It looses its luster when one
needs to emulate XADD with LL/SC or CAS...


For instance, I got pissed off when the SPARC dropped support for
the SWAP instruction. Now, one needs to emulate SWAP using a
CAS-loop!

;^/



> I see people still implement MS node-based queue with PDR, and that's 
> 2 CAS loops + indirections + memory allocations + PDR acquire/release 
> overheads.... veeeeeery slow :) 

Compared to this bounded queue, they are slower than a snail
hiking up a mountain of salt!

;^)


[snip Go] 

> Current Go implementation is "straightforward" mutex-based. But it's 
> still quite tricky (search for selgen handling). 

> I've made several attempts on this (the code is not working and dirty): 
> They all failed. I think I know how to put it all together now, but 
> just had not have time for that yet. 

Wow. I will take a look at that.

Thank you for all of the information!

[...]


:^)
Reply all
Reply to author
Forward
0 new messages