This is a bounded MPMC queue spinlock that I developed based on the
following awesome algorihtm:
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
read _all_ of the following for some further context:
https://groups.google.com/d/topic/lock-free/acjQ3-89abE/discussion
It only uses fetch_add, and atomic loads and stores to manage the queue.
No CAS! Pretty neat. ;^)
It can be outfitted with eventcounts to get rid of the spinning...
Fwiw, the following is a Relacy test unit: Will port this to pure C++ in
a day or two:
_____________________________
/*
Experimental Bakery Queue Spinlock by Chris M. Thomasson
Based on the following CAS based original awesome version by:
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
read all of the following for further context:
https://groups.google.com/d/topic/lock-free/acjQ3-89abE/discussion
My tweak does not use CAS...:
<pseudo code, membars aside for a moment>
______________________________________________
struct cell { uint32_t ver; double state; };
uint32_t head = 0;
uint32_t tail = 0;
cell cells[N]; // N must be a power of 2
void init() {
for (uint32_t i = 0; i < N; ++i) cells[i].ver = i;
}
void producer(double state) {
uint32_t ver = XADD(&head, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver) backoff();
c.state = state;
STORE(&c.ver, ver + 1);
}
double consumer() {
uint32_t ver = XADD(&tail, 1);
cell& c = cells[ver & (N - 1)];
while (LOAD(&c.ver) != ver + 1) backoff();
double state = c.state;
STORE(&c.ver, ver + N);
return state;
}
_______________________________________________*/
//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC
#include <relacy/relacy_std.hpp>
#include <iostream>
// Some global vars directing the show...
// PRODUCERS must equal CONSUMERS for this test
#define PRODUCERS 4
#define CONSUMERS 4
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 3
#define DEPTH 3
struct ct_mpmcq_spin
{
struct cell
{
std::atomic<unsigned long> m_ticket;
VAR_T(long) m_udata;
};
std::atomic<unsigned long> m_head;
std::atomic<unsigned long> m_tail;
cell m_cells[DEPTH];
ct_mpmcq_spin() : m_head(0), m_tail(0)
{
for (unsigned long i = 0; i < DEPTH; ++i)
{
m_cells[i].m_ticket.store(i, std::memory_order_relaxed);
}
}
void push_wait(long udata)
{
unsigned long head = m_head.fetch_add(1,
std::memory_order_acquire);
std::size_t head_idx = head % DEPTH;
cell& head_cell = m_cells[head_idx];
// spin on our ticket...
while (head_cell.m_ticket.load(std::memory_order_acquire) != head)
{
// backoff...
rl::yield(1, $);
}
// write, copy, ect...
VAR(head_cell.m_udata) = udata;
// commit...
head_cell.m_ticket.store(head + 1, std::memory_order_release);
}
long pop_wait()
{
unsigned long tail = m_tail.fetch_add(1,
std::memory_order_acquire);
std::size_t tail_idx = tail % DEPTH;
cell& tail_cell = m_cells[tail_idx];
// spin on our ticket...
while (tail_cell.m_ticket.load(std::memory_order_acquire) !=
tail + 1)
{
// backoff...
rl::yield(1, $);
}
// write, copy, ect...
long udata = VAR(tail_cell.m_udata);
// commit...
tail_cell.m_ticket.store(tail + DEPTH, std::memory_order_release);
return udata;
}
};
// Relacy Stack Test...
struct ct_mpmcq_spin_test_tweak
: rl::test_suite<ct_mpmcq_spin_test_tweak, THREADS>
{
ct_mpmcq_spin g_mpmc_queue;
ct_mpmcq_spin_test_tweak() {}
void before()
{
}
void after()
{
}
// reader
void consumer(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
g_mpmc_queue.push_wait(tidx);
}
}
// writer
void producer(unsigned int tidx)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
unsigned long udata = g_mpmc_queue.pop_wait();
RL_ASSERT(udata != tidx);
}
}
void thread(unsigned int tidx)
{
if (tidx < PRODUCERS)
{
producer(tidx);
}
else
{
consumer(tidx);
}
}
};
// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;
p.iteration_count = 10000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;
rl::simulate<ct_mpmcq_spin_test_tweak>(p);
}
return 0;
}
_____________________________