This allows a thread to get around the ABA problem, in a poor mans RCU
style. The C++ code uses Relacy Race Detector. It eases my mind to run
it as a unit test before I code it in pure C++11.
Using simple per-thread locks and a little quiescence detection scheme
to keep things Kosher.
https://groups.google.com/forum/#!topic/lock-free/ryIAaCod7OQ
https://pastebin.com/raw/FpnvTqvM
_________________________
// Simple Per-Thread Mutex Based Proxy Collector
// For Academic and Experimental things...
// Beginner, Moderate?
// In Good Ol' Relacy! Nice as always.
//_______________________________________________
//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC
#include <relacy/relacy_std.hpp>
#include <vector>
#include <algorithm>
#include <iostream>
// Simple macro based redirection of the verobsse std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)
// Some global vars directing the show...
#define WORKERS 4
#define REAPERS 2 // set it to one for now...
#define ITERS 5
#define THREADS (WORKERS + REAPERS)
// A User Node
struct unode
{
std::atomic<unode*> m_next;
VAR_T(unsigned long) m_data;
unode(unsigned long data, unode* next = nullptr) : m_data(data),
m_next(next) {}
};
// The global ptex state
template<std::size_t T_threads>
struct ptex
{
// Per-thread state
struct per_thread
{
std::mutex m_quiesce; // essential for this scheme
VAR_T(unsigned long) m_ver_local; // for future use
std::atomic<unsigned long> m_ver_reap; // for future use
per_thread() : m_ver_local(0), m_ver_reap(0)
{
//std::cout << "ptex::per_thread::per_thread()\n";
}
~per_thread() throw()
{
//std::cout << "ptex::per_thread::~per_thread()\n";
}
// A thread enters a region
void acquire()
{
unsigned long nver = VAR(m_ver_local)++;
m_quiesce.lock($);
m_ver_reap.store(nver, CT_MB_RLX);
CT_MB(CT_MB_ACQ);
}
// A thread exits a region
void release()
{
CT_MB(CT_MB_REL);
unsigned long ver = VAR(m_ver_local);
m_ver_reap.store(ver + 1, CT_MB_RLX);
m_quiesce.unlock($);
VAR(m_ver_local) = ver + 1;
}
void quiesce_brute()
{
// Ensure quiescence
m_quiesce.lock($);
m_quiesce.unlock($);
}
};
per_thread m_threads[T_threads];
std::atomic<unode*> m_collected;
ptex() : m_collected(nullptr)
{
//std::cout << "ptex::ptex()\n";
}
~ptex() throw()
{
RL_ASSERT(!m_collected.load(CT_MB_RLX));
//std::cout << "ptex::~ptex()\n";
}
// Gain a ref to our threads per-thread struct
per_thread& get_ref() throw()
{
return m_threads[rl::thread_index()];
}
// delete everything...
void dump(unode* n)
{
while (n)
{
unode* next = n->m_next.load(CT_MB_RLX);
delete n;
n = next;
}
}
// We are finished, collect the node...
void collect(unode* n) throw()
{
unode* head = m_collected.load(CT_MB_RLX);
do
{
n->m_next.store(head, CT_MB_RLX);
CT_MB(CT_MB_REL); // release
} while (!m_collected.compare_exchange_weak(
head, n, CT_MB_RLX));
}
// Flush the stack
unode* reap() throw()
{
unode* head = m_collected.exchange(nullptr, CT_MB_RLX);
if (head) CT_MB(CT_MB_ACQ); // acquire
return head;
}
// Wait for a quiesce
void quiesce_brute() throw()
{
for (std::size_t i = 0; i < T_threads; ++i)
{
per_thread& pt = m_threads[i];
//pt.quiesce();
pt.quiesce_brute();
}
}
};
// Relacy Multex Test...
struct ct_multex_test
: rl::test_suite<ct_multex_test, THREADS>
{
typedef ptex<THREADS> ptex_t;
ptex_t g_ptex; // Global ptex
std::atomic<unode*> g_worker_stack; // Global User Stack
unsigned long g_contrived_shutdown; // Just for shutdown
void before()
{
g_worker_stack.store(nullptr, CT_MB_RLX);
g_contrived_shutdown = WORKERS;
}
void after()
{
}
// Add a user item to the stack
void worker_push(unode* n)
{
unode* head = g_worker_stack.load(CT_MB_RLX);
do
{
n->m_next.store(head, CT_MB_RLX);
CT_MB(CT_MB_REL); // release
} while (!g_worker_stack.compare_exchange_weak(head, n,
CT_MB_RLX));
}
// Remove a user item from the stack
unode* worker_pop_try()
{
unode* head = g_worker_stack.load(CT_MB_RLX);
unode* next = nullptr;
do
{
if (!head) break;
CT_MB(CT_MB_ACQ); // acquire
next = head->m_next.load(CT_MB_RLX); // critical load! ;^o
} while (!g_worker_stack.compare_exchange_weak(head, next,
CT_MB_RLX));
return head;
}
// A User Worker Example. The main point!
void worker()
{
ptex_t::per_thread& self = g_ptex.get_ref();
//std::cout << "worker[" << rl::thread_index() << "]\n";
// perform some work
for (unsigned long i = 0; i < ITERS; ++i)
{
// Push a node
unode* n = new unode(rl::thread_index());
worker_push(n);
// Pop a node
unode* nx = nullptr;
for (;;)
{
self.acquire(); // Per thread acquire
nx = worker_pop_try(); // In critical region
if (nx)
{
unsigned long data = VAR(nx->m_data);
//std::cout << data << "\n";
RL_ASSERT(data < THREADS);
}
self.release(); // Per thread acquire
if (nx) break;
rl::yield(1, $);
}
// Collect a node
g_ptex.collect(nx); // We are fin with nx
// Quiesce a couple of times per iteration.
if ((i % 2) == 0)
{
//self.quiesce(); // without brute force
}
}
// remove our worker
--g_contrived_shutdown;
}
// Holds user items in a stack
struct pending_reap
{
unode* m_head;
unode* m_tail;
pending_reap() : m_head(nullptr), m_tail(nullptr) {}
// push items
void push(unode* head, unode* tail)
{
if (!m_head)
{
m_head = head;
m_tail = tail;
}
else
{
tail->m_next.store(m_head, CT_MB_RLX);
m_head = head;
}
}
// dump all
void dump()
{
unode* n = m_head;
m_head = m_tail = nullptr;
while (n)
{
unode* next = n->m_next.load(CT_MB_RLX);
delete n;
n = next;
}
}
};
// Tracks a per_thread quiescence
struct thread_quiesce_node
{
thread_quiesce_node* m_next;
ptex_t::per_thread* m_thread;
thread_quiesce_node(ptex_t::per_thread* thread) :
m_next(nullptr), m_thread(thread) {}
};
// tracks a reapers control of its view of quiescence
struct thread_quiesce
{
thread_quiesce_node* m_head;
thread_quiesce_node* m_ready;
ptex_t& m_ptex;
thread_quiesce(ptex_t& ptex) : m_head(nullptr),
m_ready(nullptr), m_ptex(ptex)
{
for (std::size_t i = 0; i < THREADS; ++i)
{
thread_quiesce_node* n =
new thread_quiesce_node(&ptex.m_threads[i]);
n->m_next = m_head;
m_head = n;
}
}
~thread_quiesce()
{
thread_quiesce_node* n = m_head;
// Dump the head
while (n)
{
thread_quiesce_node* next = n->m_next;
delete n;
n = next;
}
// Dump ready... We are at dtor anyway.
n = m_ready;
while (n)
{
thread_quiesce_node* next = n->m_next;
delete n;
n = next;
}
}
// Run a quiescence detection cycle
bool quiesce()
{
// Split sort...
thread_quiesce_node* not_ready = nullptr;
thread_quiesce_node* n = m_head;
// Iterate...
while (n)
{
thread_quiesce_node* next = n->m_next;
if (!n->m_thread->m_quiesce.try_lock($))
{
// NOT ready for the cycle...
n->m_next = not_ready;
not_ready = n;
}
else
{
// PERFECT! We can run full steam ahead. :^)
n->m_thread->m_quiesce.unlock($);
n->m_next = m_ready;
m_ready = n;
}
n = next;
}
m_head = not_ready;
if (m_head) return false;
// BINGO: TARGET ACQUIRED!
m_head = m_ready;
m_ready = nullptr;
return true;
}
};
// Add a reap to a reap...
unode* reap_penders(pending_reap& reap)
{
unode* fhead = g_ptex.reap();
if (fhead)
{
// Find tail
unode* ftail = fhead;
unode* next = ftail->m_next.load(CT_MB_RLX);
while (next)
{
ftail = next;
next = next->m_next.load(CT_MB_RLX);
}
// Push onto fresh queue
reap.push(fhead, ftail);
}
return fhead;
}
// Actually deletes user items
void reaper()
{
ptex_t::per_thread& self = g_ptex.get_ref();
//std::cout << "reaper[" << rl::thread_index() << "]\n";
pending_reap fresh;
pending_reap old;
thread_quiesce tquiesce(g_ptex);
unsigned long qcount = 0;
// Reap Loop
{
unode* fhead = nullptr;
while (g_contrived_shutdown != 0)
{
reap_penders(fresh);
if (tquiesce.quiesce())
{
// The good ol' two step...
pending_reap dumpster = old;
old = fresh;
fresh.m_head = fresh.m_tail = nullptr;
dumpster.dump();
}
rl::yield(1, $);
}
}
// Clean up, for we are fin.
reap_penders(fresh);
g_ptex.quiesce_brute(); // wait for a quiescence point
// Flush... Drain sound... ;^)
fresh.dump();
old.dump();
}
// Direct the work to the correct threads... ;^)
void thread(unsigned int tidx)
{
if (tidx < REAPERS)
{
reaper();
}
else
{
worker();
}
}
};
// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;
p.iteration_count = 5000000;
//p.execution_depth_limit = 33333;
// p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;
rl::simulate<ct_multex_test>(p);
}
return 0;
}
_________________________
Any interest?