Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

simple proxy gc scheme...

19 views
Skip to first unread message

Chris M. Thomasson

unread,
Dec 18, 2018, 2:42:04 AM12/18/18
to
This allows a thread to get around the ABA problem, in a poor mans RCU
style. The C++ code uses Relacy Race Detector. It eases my mind to run
it as a unit test before I code it in pure C++11.

Using simple per-thread locks and a little quiescence detection scheme
to keep things Kosher.

https://groups.google.com/forum/#!topic/lock-free/ryIAaCod7OQ

https://pastebin.com/raw/FpnvTqvM

_________________________
// Simple Per-Thread Mutex Based Proxy Collector
// For Academic and Experimental things...
// Beginner, Moderate?
// In Good Ol' Relacy! Nice as always.
//_______________________________________________


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <vector>
#include <algorithm>
#include <iostream>


// Simple macro based redirection of the verobsse std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)


// Some global vars directing the show...
#define WORKERS 4
#define REAPERS 2 // set it to one for now...
#define ITERS 5
#define THREADS (WORKERS + REAPERS)


// A User Node
struct unode
{
std::atomic<unode*> m_next;
VAR_T(unsigned long) m_data;
unode(unsigned long data, unode* next = nullptr) : m_data(data),
m_next(next) {}
};


// The global ptex state
template<std::size_t T_threads>
struct ptex
{

// Per-thread state
struct per_thread
{
std::mutex m_quiesce; // essential for this scheme
VAR_T(unsigned long) m_ver_local; // for future use
std::atomic<unsigned long> m_ver_reap; // for future use


per_thread() : m_ver_local(0), m_ver_reap(0)
{
//std::cout << "ptex::per_thread::per_thread()\n";
}

~per_thread() throw()
{
//std::cout << "ptex::per_thread::~per_thread()\n";
}


// A thread enters a region
void acquire()
{
unsigned long nver = VAR(m_ver_local)++;
m_quiesce.lock($);
m_ver_reap.store(nver, CT_MB_RLX);
CT_MB(CT_MB_ACQ);
}


// A thread exits a region
void release()
{
CT_MB(CT_MB_REL);
unsigned long ver = VAR(m_ver_local);
m_ver_reap.store(ver + 1, CT_MB_RLX);
m_quiesce.unlock($);
VAR(m_ver_local) = ver + 1;
}


void quiesce_brute()
{
// Ensure quiescence
m_quiesce.lock($);
m_quiesce.unlock($);
}
};


per_thread m_threads[T_threads];
std::atomic<unode*> m_collected;


ptex() : m_collected(nullptr)
{
//std::cout << "ptex::ptex()\n";
}


~ptex() throw()
{
RL_ASSERT(!m_collected.load(CT_MB_RLX));
//std::cout << "ptex::~ptex()\n";
}

// Gain a ref to our threads per-thread struct
per_thread& get_ref() throw()
{
return m_threads[rl::thread_index()];
}

// delete everything...
void dump(unode* n)
{
while (n)
{
unode* next = n->m_next.load(CT_MB_RLX);
delete n;
n = next;
}
}


// We are finished, collect the node...
void collect(unode* n) throw()
{
unode* head = m_collected.load(CT_MB_RLX);

do
{
n->m_next.store(head, CT_MB_RLX);
CT_MB(CT_MB_REL); // release

} while (!m_collected.compare_exchange_weak(
head, n, CT_MB_RLX));
}


// Flush the stack
unode* reap() throw()
{
unode* head = m_collected.exchange(nullptr, CT_MB_RLX);
if (head) CT_MB(CT_MB_ACQ); // acquire
return head;
}


// Wait for a quiesce
void quiesce_brute() throw()
{
for (std::size_t i = 0; i < T_threads; ++i)
{
per_thread& pt = m_threads[i];
//pt.quiesce();
pt.quiesce_brute();
}
}
};


// Relacy Multex Test...
struct ct_multex_test
: rl::test_suite<ct_multex_test, THREADS>
{
typedef ptex<THREADS> ptex_t;
ptex_t g_ptex; // Global ptex
std::atomic<unode*> g_worker_stack; // Global User Stack
unsigned long g_contrived_shutdown; // Just for shutdown


void before()
{
g_worker_stack.store(nullptr, CT_MB_RLX);
g_contrived_shutdown = WORKERS;
}


void after()
{

}


// Add a user item to the stack
void worker_push(unode* n)
{
unode* head = g_worker_stack.load(CT_MB_RLX);

do
{
n->m_next.store(head, CT_MB_RLX);
CT_MB(CT_MB_REL); // release
} while (!g_worker_stack.compare_exchange_weak(head, n,
CT_MB_RLX));
}

// Remove a user item from the stack
unode* worker_pop_try()
{
unode* head = g_worker_stack.load(CT_MB_RLX);
unode* next = nullptr;

do
{
if (!head) break;
CT_MB(CT_MB_ACQ); // acquire
next = head->m_next.load(CT_MB_RLX); // critical load! ;^o

} while (!g_worker_stack.compare_exchange_weak(head, next,
CT_MB_RLX));

return head;
}


// A User Worker Example. The main point!
void worker()
{
ptex_t::per_thread& self = g_ptex.get_ref();
//std::cout << "worker[" << rl::thread_index() << "]\n";

// perform some work
for (unsigned long i = 0; i < ITERS; ++i)
{
// Push a node
unode* n = new unode(rl::thread_index());

worker_push(n);


// Pop a node
unode* nx = nullptr;

for (;;)
{
self.acquire(); // Per thread acquire

nx = worker_pop_try(); // In critical region

if (nx)
{
unsigned long data = VAR(nx->m_data);
//std::cout << data << "\n";
RL_ASSERT(data < THREADS);
}

self.release(); // Per thread acquire

if (nx) break;

rl::yield(1, $);
}

// Collect a node
g_ptex.collect(nx); // We are fin with nx


// Quiesce a couple of times per iteration.
if ((i % 2) == 0)
{
//self.quiesce(); // without brute force
}
}

// remove our worker
--g_contrived_shutdown;
}



// Holds user items in a stack
struct pending_reap
{
unode* m_head;
unode* m_tail;

pending_reap() : m_head(nullptr), m_tail(nullptr) {}

// push items
void push(unode* head, unode* tail)
{
if (!m_head)
{
m_head = head;
m_tail = tail;
}

else
{
tail->m_next.store(m_head, CT_MB_RLX);
m_head = head;
}
}

// dump all
void dump()
{
unode* n = m_head;
m_head = m_tail = nullptr;

while (n)
{
unode* next = n->m_next.load(CT_MB_RLX);
delete n;
n = next;
}
}
};


// Tracks a per_thread quiescence
struct thread_quiesce_node
{
thread_quiesce_node* m_next;
ptex_t::per_thread* m_thread;

thread_quiesce_node(ptex_t::per_thread* thread) :
m_next(nullptr), m_thread(thread) {}
};


// tracks a reapers control of its view of quiescence
struct thread_quiesce
{
thread_quiesce_node* m_head;
thread_quiesce_node* m_ready;
ptex_t& m_ptex;

thread_quiesce(ptex_t& ptex) : m_head(nullptr),
m_ready(nullptr), m_ptex(ptex)
{
for (std::size_t i = 0; i < THREADS; ++i)
{
thread_quiesce_node* n =
new thread_quiesce_node(&ptex.m_threads[i]);

n->m_next = m_head;
m_head = n;
}
}

~thread_quiesce()
{
thread_quiesce_node* n = m_head;

// Dump the head
while (n)
{
thread_quiesce_node* next = n->m_next;
delete n;
n = next;
}

// Dump ready... We are at dtor anyway.
n = m_ready;

while (n)
{
thread_quiesce_node* next = n->m_next;
delete n;
n = next;
}
}


// Run a quiescence detection cycle
bool quiesce()
{
// Split sort...
thread_quiesce_node* not_ready = nullptr;
thread_quiesce_node* n = m_head;

// Iterate...
while (n)
{
thread_quiesce_node* next = n->m_next;

if (!n->m_thread->m_quiesce.try_lock($))
{
// NOT ready for the cycle...
n->m_next = not_ready;
not_ready = n;
}

else
{
// PERFECT! We can run full steam ahead. :^)
n->m_thread->m_quiesce.unlock($);
n->m_next = m_ready;
m_ready = n;
}

n = next;
}

m_head = not_ready;

if (m_head) return false;

// BINGO: TARGET ACQUIRED!
m_head = m_ready;
m_ready = nullptr;

return true;
}
};

// Add a reap to a reap...
unode* reap_penders(pending_reap& reap)
{
unode* fhead = g_ptex.reap();

if (fhead)
{
// Find tail
unode* ftail = fhead;
unode* next = ftail->m_next.load(CT_MB_RLX);

while (next)
{
ftail = next;
next = next->m_next.load(CT_MB_RLX);
}

// Push onto fresh queue
reap.push(fhead, ftail);
}

return fhead;
}


// Actually deletes user items
void reaper()
{
ptex_t::per_thread& self = g_ptex.get_ref();
//std::cout << "reaper[" << rl::thread_index() << "]\n";

pending_reap fresh;
pending_reap old;
thread_quiesce tquiesce(g_ptex);

unsigned long qcount = 0;

// Reap Loop
{
unode* fhead = nullptr;

while (g_contrived_shutdown != 0)
{
reap_penders(fresh);

if (tquiesce.quiesce())
{
// The good ol' two step...
pending_reap dumpster = old;
old = fresh;
fresh.m_head = fresh.m_tail = nullptr;

dumpster.dump();
}

rl::yield(1, $);
}
}

// Clean up, for we are fin.
reap_penders(fresh);

g_ptex.quiesce_brute(); // wait for a quiescence point

// Flush... Drain sound... ;^)
fresh.dump();
old.dump();
}


// Direct the work to the correct threads... ;^)
void thread(unsigned int tidx)
{
if (tidx < REAPERS)
{
reaper();
}

else
{
worker();
}
}
};



// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;

p.iteration_count = 5000000;
//p.execution_depth_limit = 33333;
// p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;

rl::simulate<ct_multex_test>(p);
}

return 0;
}

_________________________


Any interest?

Chris M. Thomasson

unread,
Dec 23, 2018, 3:49:10 PM12/23/18
to
On 12/17/2018 11:41 PM, Chris M. Thomasson wrote:
> This allows a thread to get around the ABA problem, in a poor mans RCU
> style. The C++ code uses Relacy Race Detector. It eases my mind to run
> it as a unit test before I code it in pure C++11.
>
> Using simple per-thread locks and a little quiescence detection scheme
> to keep things Kosher.
>
> https://groups.google.com/forum/#!topic/lock-free/ryIAaCod7OQ
>
> https://pastebin.com/raw/FpnvTqvM
[...]

A simple proxy with per-thread mutexes. Threads enter a protected region
by taking their own lock, and leave it by releasing said lock. Very
basic. When a thread wants to defer a node for deletion it pushes it
onto a global lock-free stack via CAS. There is a reaper thread that
flushes all of the nodes with XCHG, and keeps it on an internal list.
The reaper thread loop then tries to acquire and release all of the
per-thread locks. Once it has done this, it says quiescent state. It
keeps nodes in a way that ensures at least two periods of this state
have occurred before it actually calls delete and destroys memory. Since
the reapers use a try_lock to detect a quiescent state, it can livelock
in a reaper in the sense that it never gains a lock. However, Relacy
should never detect the condition because of the limited iteration loop
for workers wrt the test code itself. There is a work around. We can let
a reaper fail for a little while before it actually ditches try_lock and
just locks the per-thread quiescence lock. It would act just like an
adaptive mutex, in a sense...

_______________
fresh = new generation
old = old generation

// reaper loop

fresh = gain new nodes

if (quiescent)
{
dump = old;
old = fresh;
fresh = empty;

dump.destroy(); // reclaim memory...
}
_______________


Fwiw, if interested, read some of the new responses in:

https://groups.google.com/forum/#!topic/lock-free/ryIAaCod7OQ
0 new messages