Chris M. Thomasson
unread,Jan 13, 2018, 6:10:38 PM1/13/18You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to
A simple LIFO algorithm... Any questions? ;^)
__________________________
#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst
#define mb_fence(mb) std::atomic_thread_fence(mb)
// simple single producer/multi-consumer queue
struct node
{
node* m_next;
node() : m_next(nullptr) {} // tidy...
};
struct spmc_queue
{
std::atomic<node*> m_head;
spmc_queue() : m_head(nullptr) {}
// push a single node
void push(node* const n)
{
node* head = m_head.load(mb_relaxed);
for (;;)
{
n->m_next = head;
mb_fence(mb_release);
if (m_head.compare_exchange_weak(head, n, mb_relaxed))
{
break;
}
}
}
// try to flush all of our nodes
node* flush()
{
node* n = m_head.exchange(nullptr, mb_relaxed);
if (n)
{
mb_fence(mb_acquire);
}
return n;
}
};
__________________________
Here is an example program that uses it:
__________________________
#include <cstdio>
#include <mutex>
#include <memory>
#include <thread>
#include <atomic>
#include <algorithm>
#include <cassert>
#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst
#define mb_fence(mb) std::atomic_thread_fence(mb)
// Just a check...
static std::atomic<unsigned long> g_alloc_count(0);
// simple single producer/multi-consumer queue
struct node
{
node* m_next;
node() : m_next(nullptr) {} // tidy...
};
struct spmc_queue
{
std::atomic<node*> m_head;
spmc_queue() : m_head(nullptr) {}
// push a single node
void push(node* const n)
{
node* head = m_head.load(mb_relaxed);
for (;;)
{
n->m_next = head;
mb_fence(mb_release);
if (m_head.compare_exchange_weak(head, n, mb_relaxed))
{
break;
}
}
}
// try to flush all of our nodes
node* flush()
{
node* n = m_head.exchange(nullptr, mb_relaxed);
if (n)
{
mb_fence(mb_acquire);
}
return n;
}
};
// Spin-Wait, Blocking Adapt Function
node* spmc_queue_spin_lock_flush(
spmc_queue& queue
) {
node* n = nullptr;
for (;;)
{
n = queue.flush();
if (n) break;
std::this_thread::yield();
}
return n;
}
#define CONSUMERS 7
#define N 1000000
struct user_data : public node
{
int m_foo;
user_data(int foo) : m_foo(foo) {}
};
void producer_thread(
unsigned int id,
std::mutex& std_out_mutex,
spmc_queue& queue
){
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - Entry\n", id,
(void*)&queue);
}
for (unsigned int i = 0; i < N; ++i)
{
user_data* ud = new user_data(i + 1); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);
queue.push(ud);
if (! ((i + 1) % 1003))
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - produced(%u)\n", id,
(void*)&queue, i + 1);
}
}
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
user_data* ud = new user_data(0); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);
queue.push(ud);
}
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - Exit\n", id,
(void*)&queue);
}
}
void consumer_thread(
unsigned int id,
std::mutex& std_out_mutex,
spmc_queue& queue
){
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - Entry\n", id,
(void*)&queue);
}
{
for (unsigned long i = 0 ;; ++i)
{
// Wait for something...
user_data* ud = (user_data*)spmc_queue_spin_lock_flush(queue);
assert(ud); // make sure we have something!
int counter = 0;
while (ud)
{
node* ud_next = ud->m_next;
unsigned int foo = ud->m_foo;
delete ud; // reclaim memory
g_alloc_count.fetch_sub(1, mb_relaxed);
if (foo == 0)
{
// We have recieved a "stop" signal
counter++;
}
if (!((i + 1) % 1003))
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) -
consumed(foo:%u)\n",
id, (void*)&queue, foo);
}
ud = (user_data*)ud_next;
}
std::this_thread::yield(); // just for spice...
while (counter > 1)
{
// Replay all of the excess stop signals
user_data* ud = new user_data(0); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);
queue.push(ud);
--counter;
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) -
replay(%u)*****************\n",
id, (void*)&queue, counter);
}
}
if (counter == 1)
{
// We are fin!
break;
}
}
}
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - Exit\n", id,
(void*)&queue);
}
}
int main(void)
{
{
spmc_queue queue;
std::thread consumers[CONSUMERS];
std::mutex std_out_mutex;
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i] = std::thread(
consumer_thread,
i + 1,
std::ref(std_out_mutex),
std::ref(queue)
);
}
std::thread producer(
producer_thread,
0,
std::ref(std_out_mutex),
std::ref(queue)
);
producer.join();
for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i].join();
}
}
std::printf("g_alloc_count:(%lu)\n", g_alloc_count.load(mb_relaxed));
assert(g_alloc_count.load(mb_relaxed) == 0);
std::printf("\nComplete, hit <ENTER> to exit...\n");
std::fflush(stdout);
std::getchar();
return 0;
}
__________________________
Can anybody else run this?