Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Experimental Bounded Bakery Queue...

14 views
Skip to first unread message

Chris M. Thomasson

unread,
Mar 6, 2019, 4:37:13 AM3/6/19
to
This is a bounded MPMC queue spinlock that I developed based on the
following awesome algorihtm:

http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

read _all_ of the following for some further context:

https://groups.google.com/d/topic/lock-free/acjQ3-89abE/discussion

It only uses fetch_add, and atomic loads and stores to manage the queue.
No CAS! Pretty neat. ;^)

It can be outfitted with eventcounts to get rid of the spinning...


Fwiw, the following is a Relacy test unit: Will port this to pure C++ in
a day or two:
_____________________________
/*

Experimental Bakery Queue Spinlock by Chris M. Thomasson

Based on the following CAS based original awesome version by:

http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

read all of the following for further context:

https://groups.google.com/d/topic/lock-free/acjQ3-89abE/discussion

My tweak does not use CAS...:

<pseudo code, membars aside for a moment>
______________________________________________
struct cell { uint32_t ver; double state; };

uint32_t head = 0;
uint32_t tail = 0;
cell cells[N]; // N must be a power of 2

void init() {
for (uint32_t i = 0; i < N; ++i) cells[i].ver = i;
}

void producer(double state) {
uint32_t ver = XADD(&head, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver) backoff();
c.state = state;
STORE(&c.ver, ver + 1);
}

double consumer() {
uint32_t ver = XADD(&tail, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver + 1) backoff();
double state = c.state;
STORE(&c.ver, ver + N);
return state;
}
_______________________________________________*/


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <iostream>


// Some global vars directing the show...
// PRODUCERS must equal CONSUMERS for this test
#define PRODUCERS 4
#define CONSUMERS 4
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 3

#define DEPTH 3


struct ct_mpmcq_spin
{
struct cell
{
std::atomic<unsigned long> m_ticket;
VAR_T(long) m_udata;
};


std::atomic<unsigned long> m_head;
std::atomic<unsigned long> m_tail;
cell m_cells[DEPTH];


ct_mpmcq_spin() : m_head(0), m_tail(0)
{
for (unsigned long i = 0; i < DEPTH; ++i)
{
m_cells[i].m_ticket.store(i, std::memory_order_relaxed);
}
}


void push_wait(long udata)
{
unsigned long head = m_head.fetch_add(1,
std::memory_order_acquire);
std::size_t head_idx = head % DEPTH;

cell& head_cell = m_cells[head_idx];

// spin on our ticket...
while (head_cell.m_ticket.load(std::memory_order_acquire) != head)
{
// backoff...
rl::yield(1, $);
}

// write, copy, ect...
VAR(head_cell.m_udata) = udata;

// commit...
head_cell.m_ticket.store(head + 1, std::memory_order_release);
}


long pop_wait()
{
unsigned long tail = m_tail.fetch_add(1,
std::memory_order_acquire);
std::size_t tail_idx = tail % DEPTH;

cell& tail_cell = m_cells[tail_idx];

// spin on our ticket...
while (tail_cell.m_ticket.load(std::memory_order_acquire) !=
tail + 1)
{
// backoff...
rl::yield(1, $);
}

// write, copy, ect...
long udata = VAR(tail_cell.m_udata);

// commit...
tail_cell.m_ticket.store(tail + DEPTH, std::memory_order_release);

return udata;
}
};



// Relacy Stack Test...
struct ct_mpmcq_spin_test_tweak
: rl::test_suite<ct_mpmcq_spin_test_tweak, THREADS>
{
ct_mpmcq_spin g_mpmc_queue;

ct_mpmcq_spin_test_tweak() {}

void before()
{

}

void after()
{

}

// reader
void consumer(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
g_mpmc_queue.push_wait(tidx);
}
}


// writer
void producer(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
unsigned long udata = g_mpmc_queue.pop_wait();

RL_ASSERT(udata != tidx);
}
}


void thread(unsigned int tidx)
{
if (tidx < PRODUCERS)
{
producer(tidx);
}

else
{
consumer(tidx);
}
}
};



// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;

p.iteration_count = 10000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_mpmcq_spin_test_tweak>(p);
}

return 0;
}
_____________________________

Chris M. Thomasson

unread,
Mar 6, 2019, 4:41:13 AM3/6/19
to
On 3/6/2019 1:36 AM, Chris M. Thomasson wrote:

[...]
>     void push_wait(long udata)
>     {
>         unsigned long head = m_head.fetch_add(1,
> std::memory_order_acquire);

^^^^^^^^^^^^^^^^^^^^^^^^^^

Actually, the memory barrier above can be relaxed.


>         std::size_t head_idx = head % DEPTH;
>
>         cell& head_cell = m_cells[head_idx];
>
>         // spin on our ticket...
>         while (head_cell.m_ticket.load(std::memory_order_acquire) != head)
>         {
>             // backoff...
>             rl::yield(1, $);
>         }
>
>         // write, copy, ect...
>         VAR(head_cell.m_udata) = udata;
>
>         // commit...
>         head_cell.m_ticket.store(head + 1, std::memory_order_release);
>     }
>



>
>     long pop_wait()
>     {
>         unsigned long tail = m_tail.fetch_add(1,
> std::memory_order_acquire);
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Ditto. These have no need for anything more than relaxed order.

[...]
0 new messages