Here is an example of using a SPMC (single producer/multi consumer)
queue. It blocks using spin wait. However, this can be avoided through
the use of eventcounts, or even fast semaphores.
The test will complete, give it a minute or two, it does 10000000
iterations with 7 consumer threads.
The entire example code can be found here:
http://pastebin.com/raw/6nG6t422
Can you run this shi%?
Any thoughts, questions?
;^o
____________________________________
#include <cstdio>
#include <deque>
#include <condition_variable>
#include <mutex>
#include <memory>
#include <thread>
#include <atomic>
#include <algorithm>
#include <cassert>
#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst
#define mb_fence(mb) std::atomic_thread_fence(mb)
// Just a check...
static std::atomic<unsigned long> g_alloc_count(0);
// simple single producer/multi-consumer queue
struct node
{
node* m_next;
node() : m_next(nullptr) {} // tidy...
};
struct spmc_queue
{
std::atomic<node*> m_head;
spmc_queue() : m_head(nullptr) {}
// push a single node
void push(node* const n)
{
node* head = m_head.load(mb_relaxed);
for (;;)
{
n->m_next = head;
mb_fence(mb_release);
if (m_head.compare_exchange_weak(head, n, mb_relaxed))
{
break;
}
}
}
// try to flush all of our nodes
node* flush(node* const nhead = nullptr)
{
node* n = m_head.exchange(nhead, mb_relaxed);
if (n)
{
mb_fence(mb_acquire);
}
return n;
}
};
// Spin-Wait, Blocking Adapt Function
node* spmc_queue_spin_lock_flush(
spmc_queue& queue
) {
node* n = nullptr;
for (;;)
{
n = queue.flush();
if (n) break;
std::this_thread::yield();
}
return n;
}
#define CONSUMERS 7
#define N 10000000
struct user_data : public node
{
int m_foo;
user_data(int foo) : m_foo(foo) {}
};
void producer_thread(
unsigned int id,
std::mutex& std_out_mutex,
spmc_queue& queue
){
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - Entry\n", id,
(void*)&queue);
}
for (unsigned int i = 0; i < N; ++i)
{
user_data* ud = new user_data(i + 1); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);
queue.push(ud);
if (! ((i + 1) % 1003))
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - produced(%u)\n", id,
(void*)&queue, i + 1);
}
}
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
user_data* ud = new user_data(0); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);
queue.push(ud);
}
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - Exit\n", id, (void*)&queue);
}
}
void consumer_thread(
unsigned int id,
std::mutex& std_out_mutex,
spmc_queue& queue
){
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - Entry\n", id,
(void*)&queue);
}
{
for (unsigned long i = 0 ;; ++i)
{
// Wait for something...
user_data* ud = (user_data*)spmc_queue_spin_lock_flush(queue);
assert(ud); // make sure we have something!
int counter = 0;
while (ud)
{
node* ud_next = ud->m_next;
unsigned int foo = ud->m_foo;
delete ud; // reclaim memory
g_alloc_count.fetch_sub(1, mb_relaxed);
if (foo == 0)
{
// We have recieved a "stop" signal
counter++;
}
if (!((i + 1) % 1003))
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) -
consumed(foo:%u)\n",
id, (void*)&queue, foo);
}
ud = (user_data*)ud_next;
}
std::this_thread::yield(); // just for spice...
while (counter > 1)
{
// Replay all of the excess stop signals
user_data* ud = new user_data(0); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);
queue.push(ud);
--counter;
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - replay(%u)
*****************\n",
id, (void*)&queue, counter);
}
}
if (counter == 1)
{
// We are fin!
break;
}
}
}
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - Exit\n", id, (void*)&queue);
}
}
int main(void)
{
{
spmc_queue queue;
std::thread consumers[CONSUMERS];
std::mutex std_out_mutex;
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i] = std::thread(
consumer_thread,
i + 1,
std::ref(std_out_mutex),
std::ref(queue)
);
}
std::thread producer(
producer_thread,
0,
std::ref(std_out_mutex),
std::ref(queue)
);
producer.join();
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i].join();
}
}
std::printf("g_alloc_count:(%lu)\n", g_alloc_count.load(mb_relaxed));
assert(g_alloc_count.load(mb_relaxed) == 0);
std::printf("\nComplete, hit <ENTER> to exit...\n");
std::fflush(stdout);
std::getchar();
return 0;
}
____________________________________
There is it, the whole pile. ;^)
It should compile!