Well, here is some crude and highly experimental C++11 code that avoids
CAS wrt a lock-free stack, no version numbers, no memory lifetime
issues. It does have a major caveat's. It pops the whole lock-free stack
using a single atomic swap. So, a consumer gets all the nodes. The fun
part is that producers use atomic swap to insert nodes, there is an
interesting window... Its single word. Its a trade off. Well, here is my
experimental and crude code, can you get it to run?
https://pastebin.com/raw/j41cPT9S
// Simple XCHG based Atomic Stack
// By: Chris M. Thomasson
#include <iostream>
#include <atomic>
#include <mutex>
#include <thread>
#include <functional>
#include <cassert>
// sorry about the macros...
#define THREADS 42
#define ITERS 100000
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
// HACK! Humm...
#define CT_WAIT ((ct_work*)(0xDEADBEEFU))
// Just to track all the dynamic allocations...
static std::atomic<unsigned long> g_allocs(0);
static std::mutex g_cout_mtx;
// A work node
struct ct_work
{
std::atomic<ct_work*> m_next;
std::thread::id m_data;
ct_work(std::thread::id data) : m_next(nullptr), m_data(data) {}
void process()
{
// Simulate just a tiny little work?
g_cout_mtx.lock();
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
std::thread::id local = std::this_thread::get_id();
if (m_data == local)
{
// std::cout << "processing local = " << m_data <<
// " from " << std::this_thread::get_id() << "\n";
}
else
{
std::cout << "processing foreign = " << m_data <<
" from " << std::this_thread::get_id() << "\n";
}
std::this_thread::yield();
std::this_thread::yield();
std::this_thread::yield();
g_cout_mtx.unlock();
}
ct_work* get_next() const
{
ct_work* w = nullptr;
while ((w = m_next.load(CT_MB_RLX)) == CT_WAIT)
{
// we can spin, or even do other work right here...
std::this_thread::yield();
}
return w;
}
};
// Easy Stack, only uses XCHG
struct ct_estack
{
std::atomic<ct_work*> m_head;
ct_estack() : m_head(nullptr) {}
void push(ct_work* n)
{
n->m_next.store(CT_WAIT, CT_MB_RLX);
ct_work* head = m_head.exchange(n, CT_MB_REL); // release
n->m_next.store(head, CT_MB_RLX);
}
ct_work* flush_try()
{
return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
}
};
// Consume an Easy Stack...
void ct_consume(
ct_estack& estack
) {
ct_work* w = estack.flush_try();
while (w)
{
// Process FIRST!
w->process();
// Now, we can gain the next pointer.
ct_work* next = w->get_next();
// Okay, we can delete the work
delete w;
g_allocs.fetch_sub(1, CT_MB_RLX); // dec
w = next;
}
}
// Our shared state
struct ct_shared
{
ct_estack m_estack;
};
// Produce some work...
void ct_produce(
ct_estack& estack
) {
ct_work* w = new ct_work(std::this_thread::get_id());
g_allocs.fetch_add(1, CT_MB_RLX); // inc
estack.push(w);
}
// Do some work...
void ct_worker(ct_shared& shared)
{
for (unsigned int i = 0; i < ITERS; ++i)
{
ct_produce(shared.m_estack);
ct_produce(shared.m_estack);
ct_produce(shared.m_estack);
std::this_thread::yield(); // little spice...
ct_consume(shared.m_estack);
}
ct_consume(shared.m_estack);
}
int main(void)
{
{
ct_shared shared;
std::thread threads[THREADS];
for (unsigned long i = 0; i < THREADS; ++i)
{
threads[i] = std::thread(ct_worker, std::ref(shared));
}
for (unsigned long i = 0; i < THREADS; ++i)
{
threads[i].join();
}
}
if (g_allocs.load(CT_MB_RLX) != 0)
{
std::cout << "\n\nLEAKED!!!!\n";
}
std::cout << "\n\nFIN!\n";
return 0;
}