Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

simple atomic lifo...

28 views
Skip to first unread message

Chris M. Thomasson

unread,
Jan 13, 2018, 6:10:38 PM1/13/18
to
A simple LIFO algorithm... Any questions? ;^)
__________________________
#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst

#define mb_fence(mb) std::atomic_thread_fence(mb)


// simple single producer/multi-consumer queue
struct node
{
node* m_next;
node() : m_next(nullptr) {} // tidy...
};


struct spmc_queue
{
std::atomic<node*> m_head;

spmc_queue() : m_head(nullptr) {}

// push a single node
void push(node* const n)
{
node* head = m_head.load(mb_relaxed);

for (;;)
{
n->m_next = head;
mb_fence(mb_release);

if (m_head.compare_exchange_weak(head, n, mb_relaxed))
{
break;
}
}
}

// try to flush all of our nodes
node* flush()
{
node* n = m_head.exchange(nullptr, mb_relaxed);

if (n)
{
mb_fence(mb_acquire);
}

return n;
}
};
__________________________




Here is an example program that uses it:
__________________________
#include <cstdio>
#include <mutex>
#include <memory>
#include <thread>
#include <atomic>
#include <algorithm>
#include <cassert>



#define mb_relaxed std::memory_order_relaxed
#define mb_consume std::memory_order_consume
#define mb_acquire std::memory_order_acquire
#define mb_release std::memory_order_release
#define mb_acq_rel std::memory_order_acq_rel
#define mb_seq_cst std::memory_order_seq_cst

#define mb_fence(mb) std::atomic_thread_fence(mb)


// Just a check...
static std::atomic<unsigned long> g_alloc_count(0);


// simple single producer/multi-consumer queue
struct node
{
node* m_next;
node() : m_next(nullptr) {} // tidy...
};


struct spmc_queue
{
std::atomic<node*> m_head;

spmc_queue() : m_head(nullptr) {}

// push a single node
void push(node* const n)
{
node* head = m_head.load(mb_relaxed);

for (;;)
{
n->m_next = head;
mb_fence(mb_release);

if (m_head.compare_exchange_weak(head, n, mb_relaxed))
{
break;
}
}
}

// try to flush all of our nodes
node* flush()
{
node* n = m_head.exchange(nullptr, mb_relaxed);

if (n)
{
mb_fence(mb_acquire);
}

return n;
}
};

// Spin-Wait, Blocking Adapt Function
node* spmc_queue_spin_lock_flush(
spmc_queue& queue
) {
node* n = nullptr;

for (;;)
{
n = queue.flush();
if (n) break;
std::this_thread::yield();
}

return n;
}


#define CONSUMERS 7
#define N 1000000


struct user_data : public node
{
int m_foo;

user_data(int foo) : m_foo(foo) {}
};


void producer_thread(
unsigned int id,
std::mutex& std_out_mutex,
spmc_queue& queue
){
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - Entry\n", id,
(void*)&queue);
}

for (unsigned int i = 0; i < N; ++i)
{
user_data* ud = new user_data(i + 1); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);

queue.push(ud);

if (! ((i + 1) % 1003))
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - produced(%u)\n", id,
(void*)&queue, i + 1);
}
}

for (unsigned int i = 0; i < CONSUMERS; ++i)
{
user_data* ud = new user_data(0); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);

queue.push(ud);
}

{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("producer(%u)::queue(%p) - Exit\n", id,
(void*)&queue);
}
}


void consumer_thread(
unsigned int id,
std::mutex& std_out_mutex,
spmc_queue& queue
){
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - Entry\n", id,
(void*)&queue);
}

{
for (unsigned long i = 0 ;; ++i)
{
// Wait for something...
user_data* ud = (user_data*)spmc_queue_spin_lock_flush(queue);
assert(ud); // make sure we have something!

int counter = 0;

while (ud)
{
node* ud_next = ud->m_next;

unsigned int foo = ud->m_foo;
delete ud; // reclaim memory
g_alloc_count.fetch_sub(1, mb_relaxed);

if (foo == 0)
{
// We have recieved a "stop" signal
counter++;
}

if (!((i + 1) % 1003))
{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) -
consumed(foo:%u)\n",
id, (void*)&queue, foo);
}

ud = (user_data*)ud_next;
}

std::this_thread::yield(); // just for spice...

while (counter > 1)
{
// Replay all of the excess stop signals
user_data* ud = new user_data(0); // allocate memory
g_alloc_count.fetch_add(1, mb_relaxed);

queue.push(ud);
--counter;

{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) -
replay(%u)*****************\n",
id, (void*)&queue, counter);
}
}

if (counter == 1)
{
// We are fin!
break;
}
}
}

{
std::unique_lock<std::mutex> lock(std_out_mutex);
std::printf("consumer(%u)::queue(%p) - Exit\n", id,
(void*)&queue);
}
}



int main(void)
{
{
spmc_queue queue;

std::thread consumers[CONSUMERS];
std::mutex std_out_mutex;

for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i] = std::thread(
consumer_thread,
i + 1,
std::ref(std_out_mutex),
std::ref(queue)
);
}

std::thread producer(
producer_thread,
0,
std::ref(std_out_mutex),
std::ref(queue)
);

producer.join();

for (unsigned int i = 0; i < CONSUMERS; ++i)
{
consumers[i].join();
}
}

std::printf("g_alloc_count:(%lu)\n", g_alloc_count.load(mb_relaxed));
assert(g_alloc_count.load(mb_relaxed) == 0);

std::printf("\nComplete, hit <ENTER> to exit...\n");
std::fflush(stdout);
std::getchar();

return 0;
}
__________________________


Can anybody else run this?
0 new messages