AFAICT, my tweak should be 100% compatible with the
existing algorithm as-is. IMVHO, it could be a fairly
beneficial addition to the existing API. It opens
up a new way to think about handling contention wrt
using the queue as a whole.
Here is a test of the tweak that should go ahead and
compile with Relacy:
___________________________________________________________________
/*
Bounded Multi-Producer/Multi-Consumer FIFO Queue
Distributed Waitset Algorithm
By Christopher Michael Thomasson,
Based Off Of, Dmitry Vyukov's Excellent Algorithm:
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
_______________________________________________________________________*/
//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC
#include <relacy/relacy_std.hpp>
#include <cstdio>
#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst
#define mb_relaxed_fence() std::atomic_thread_fence(mb_relaxed)
#define mb_consume_fence() std::atomic_thread_fence(mb_consume)
#define mb_acquire_fence() std::atomic_thread_fence(mb_acquire)
#define mb_release_fence() std::atomic_thread_fence(mb_release)
#define mb_acq_rel_fence() std::atomic_thread_fence(mb_acq_rel)
#define mb_seq_cst_fence() std::atomic_thread_fence(mb_seq_cst)
class waitset
{
std::mutex m_mutex;
std::condition_variable m_cond;
std::atomic<bool> m_waitbit;
VAR_T(unsigned) m_waiters;
public:
waitset()
: m_waitbit(false),
m_waiters(0)
{
}
~waitset()
{
bool waitbit = m_waitbit.load(mb_relaxed);
unsigned waiters = VAR(m_waiters);
RL_ASSERT(! waitbit && ! waiters);
}
private:
void prv_signal(bool waitbit, bool broadcast)
{
if (! waitbit) return;
m_mutex.lock($);
unsigned waiters = VAR(m_waiters);
if (waiters < 2 || broadcast)
{
m_waitbit.store(false, mb_relaxed);
}
m_mutex.unlock($);
if (waiters)
{
if (! broadcast)
{
m_cond.notify_one($);
}
else
{
m_cond.notify_all($);
}
}
}
public:
unsigned wait_begin()
{
m_mutex.lock($);
m_waitbit.store(true, mb_relaxed);
mb_seq_cst_fence();
return 0;
}
bool wait_try_begin(unsigned& key)
{
if (! m_mutex.try_lock($)) return false;
m_waitbit.store(true, mb_relaxed);
mb_seq_cst_fence();
return true;
}
void wait_cancel(unsigned key)
{
unsigned waiters = VAR(m_waiters);
if (! waiters)
{
m_waitbit.store(false, mb_relaxed);
}
m_mutex.unlock($);
}
void wait_commit(unsigned key)
{
++VAR(m_waiters);
m_cond.wait(m_mutex, $);
if (! --VAR(m_waiters))
{
m_waitbit.store(false, mb_relaxed);
}
m_mutex.unlock($);
}
public:
void signal()
{
mb_seq_cst_fence();
bool waitbit = m_waitbit.load(std::memory_order_relaxed);
prv_signal(waitbit, false);
}
void broadcast()
{
mb_seq_cst_fence();
bool waitbit = m_waitbit.load(std::memory_order_relaxed);
prv_signal(waitbit, true);
}
};
// T_depth && T_wdepth MUST be a power of 2!
template<typename T, unsigned T_depth, unsigned T_wdepth>
struct mpmc_bounded_queue
{
struct cell_type
{
std::atomic<unsigned> m_state;
VAR_T(T) m_object;
};
std::atomic<unsigned> m_head;
std::atomic<unsigned> m_tail;
waitset m_waitset[T_wdepth];
cell_type m_buffer[T_depth];
mpmc_bounded_queue() : m_head(0), m_tail(0)
{
// initialize version numbers.
for (unsigned i = 0; i < T_depth; ++i)
{
m_buffer[i].m_state.store(i, mb_relaxed);
}
}
void push(T const& obj)
{
// obtain our head version and cell.
unsigned idx = m_head.fetch_add(1, mb_relaxed);
cell_type& cell = m_buffer[idx & (T_depth - 1U)];
waitset& wset = m_waitset[idx & (T_wdepth - 1U)];
// wait for it...
while (cell.m_state.load(mb_relaxed) != idx)
{
unsigned key = wset.wait_begin();
if (cell.m_state.load(mb_relaxed) == idx)
{
wset.wait_cancel(key);
break;
}
wset.wait_commit(key);
}
mb_acquire_fence();
// GOT IT! Okay, write to the object.
VAR(cell.m_object) = obj;
// We are done; allow a consumer to consume.
mb_release_fence();
cell.m_state.store(idx + 1, mb_relaxed);
wset.broadcast();
}
void pop(T& obj)
{
// obtain our tail version and cell.
unsigned idx = m_tail.fetch_add(1, mb_relaxed);
cell_type& cell = m_buffer[idx & (T_depth - 1U)];
waitset& wset = m_waitset[idx & (T_wdepth - 1U)];
// wait for it...
while (cell.m_state.load(mb_relaxed) != idx + 1U)
{
unsigned key = wset.wait_begin();
if (cell.m_state.load(mb_relaxed) == idx + 1U)
{
wset.wait_cancel(key);
break;
}
wset.wait_commit(key);
}
mb_acquire_fence();
// GOT IT! Okay, read from the object.
obj = VAR(cell.m_object);
// We are done; allow a producer to produce.
mb_release_fence();
cell.m_state.store(idx + T_depth, mb_relaxed);
wset.broadcast();
}
};
#define PRODUCERS 2
#define CONSUMERS 3
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 7
#define BUFFER_SIZE (8)
#define WAITSET_SIZE (4)
struct mpmc_bounded_queue_test
: rl::test_suite<mpmc_bounded_queue_test, THREADS>
{
mpmc_bounded_queue<unsigned, BUFFER_SIZE, WAITSET_SIZE> g_queue;
unsigned g_test_term_producers; // test termination only!
unsigned g_test_term_consumers; // test termination only!
void before()
{
g_test_term_producers = PRODUCERS;
g_test_term_consumers = (PRODUCERS * ITERS) + CONSUMERS;
}
void after()
{
RL_ASSERT(! g_test_term_consumers &&
! g_test_term_producers);
}
void thread_producer(unsigned int tidx)
{
for (unsigned i = 0; i < ITERS; ++i)
{
g_queue.push(((tidx + 1) << 8U) + i);
}
if (! --g_test_term_producers)
{
for (unsigned i = 0; i < CONSUMERS; ++i)
{
g_queue.push(666666);
}
}
}
void thread_consumer(unsigned int tidx)
{
for (;;)
{
unsigned v;
g_queue.pop(v);
--g_test_term_consumers;
if (v == 666666)
{
break;
}
}
}
void thread(unsigned int tidx)
{
if (tidx < PRODUCERS)
{
thread_producer(tidx);
}
else
{
thread_consumer(tidx);
}
}
};
int main()
{
{
rl::test_params p;
// p.iteration_count = 1000000;
//p.execution_depth_limit = 33333;
//p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
p.search_type = rl::fair_context_bound_scheduler_type;
rl::simulate<mpmc_bounded_queue_test>(p);
}
std::puts("\nTest Complete!\n");
std::getchar();
return 0;
}
___________________________________________________________________
You can find the code here as well:
Well, what do you think of this?
...
BTW, thank you for creating the simply excellent group!
:^)