// Simple Per-Thread Mutex Based Proxy Collector
// For Academic and Experimental things...
// Beginner, Moderate?
// In Good Ol' Relacy! Nice as always.
//_______________________________________________
//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC
#include <relacy/relacy_std.hpp>
#include <vector>
#include <algorithm>
#include <iostream>
// Simple macro based redirection of the verobsse std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)
// Some global vars directing the show...
#define WORKERS 4
#define REAPERS 2 // set it to one for now...
#define ITERS 5
#define THREADS (WORKERS + REAPERS)
// A User Node
struct unode
{
std::atomic<unode*> m_next;
VAR_T(unsigned long) m_data;
unode(unsigned long data, unode* next = nullptr) : m_data(data), m_next(next) {}
};
// The global ptex state
template<std::size_t T_threads>
struct ptex
{
// Per-thread state
struct per_thread
{
std::mutex m_quiesce; // essential for this scheme
VAR_T(unsigned long) m_ver_local; // for future use
std::atomic<unsigned long> m_ver_reap; // for future use
per_thread() : m_ver_local(0), m_ver_reap(0)
{
//std::cout << "ptex::per_thread::per_thread()\n";
}
~per_thread() throw()
{
//std::cout << "ptex::per_thread::~per_thread()\n";
}
// A thread enters a region
void acquire()
{
unsigned long nver = VAR(m_ver_local)++;
m_quiesce.lock($);
m_ver_reap.store(nver, CT_MB_RLX);
CT_MB(CT_MB_ACQ);
}
// A thread exits a region
void release()
{
CT_MB(CT_MB_REL);
unsigned long ver = VAR(m_ver_local);
m_ver_reap.store(ver + 1, CT_MB_RLX);
m_quiesce.unlock($);
VAR(m_ver_local) = ver + 1;
}
void quiesce_brute()
{
// Ensure quiescence
m_quiesce.lock($);
m_quiesce.unlock($);
}
};
per_thread m_threads[T_threads];
std::atomic<unode*> m_collected;
ptex() : m_collected(nullptr)
{
//std::cout << "ptex::ptex()\n";
}
~ptex() throw()
{
RL_ASSERT(!m_collected.load(CT_MB_RLX));
//std::cout << "ptex::~ptex()\n";
}
// Gain a ref to our threads per-thread struct
per_thread& get_ref() throw()
{
return m_threads[rl::thread_index()];
}
// delete everything...
void dump(unode* n)
{
while (n)
{
unode* next = n->m_next.load(CT_MB_RLX);
delete n;
n = next;
}
}
// We are finished, collect the node...
void collect(unode* n) throw()
{
unode* head = m_collected.load(CT_MB_RLX);
do
{
n->m_next.store(head, CT_MB_RLX);
CT_MB(CT_MB_REL); // release
} while (!m_collected.compare_exchange_weak(
head, n, CT_MB_RLX));
}
// Flush the stack
unode* reap() throw()
{
unode* head = m_collected.exchange(nullptr, CT_MB_RLX);
if (head) CT_MB(CT_MB_ACQ); // acquire
return head;
}
// Wait for a quiesce
void quiesce_brute() throw()
{
for (std::size_t i = 0; i < T_threads; ++i)
{
per_thread& pt = m_threads[i];
//pt.quiesce();
pt.quiesce_brute();
}
}
};
// Relacy Multex Test...
struct ct_multex_test
: rl::test_suite<ct_multex_test, THREADS>
{
typedef ptex<THREADS> ptex_t;
ptex_t g_ptex; // Global ptex
std::atomic<unode*> g_worker_stack; // Global User Stack
unsigned long g_contrived_shutdown; // Just for shutdown
void before()
{
g_worker_stack.store(nullptr, CT_MB_RLX);
g_contrived_shutdown = WORKERS;
}
void after()
{
}
// Add a user item to the stack
void worker_push(unode* n)
{
unode* head = g_worker_stack.load(CT_MB_RLX);
do
{
n->m_next.store(head, CT_MB_RLX);
CT_MB(CT_MB_REL); // release
} while (!g_worker_stack.compare_exchange_weak(head, n, CT_MB_RLX));
}
// Remove a user item from the stack
unode* worker_pop_try()
{
unode* head = g_worker_stack.load(CT_MB_RLX);
unode* next = nullptr;
do
{
if (!head) break;
CT_MB(CT_MB_ACQ); // acquire
next = head->m_next.load(CT_MB_RLX); // critical load! ;^o
} while (!g_worker_stack.compare_exchange_weak(head, next, CT_MB_RLX));
return head;
}
// A User Worker Example. The main point!
void worker()
{
ptex_t::per_thread& self = g_ptex.get_ref();
//std::cout << "worker[" << rl::thread_index() << "]\n";
// perform some work
for (unsigned long i = 0; i < ITERS; ++i)
{
// Push a node
unode* n = new unode(rl::thread_index());
worker_push(n);
// Pop a node
unode* nx = nullptr;
for (;;)
{
self.acquire(); // Per thread acquire
nx = worker_pop_try(); // In critical region
if (nx)
{
unsigned long data = VAR(nx->m_data);
//std::cout << data << "\n";
RL_ASSERT(data < THREADS);
}
self.release(); // Per thread acquire
if (nx) break;
rl::yield(1, $);
}
// Collect a node
g_ptex.collect(nx); // We are fin with nx
// Quiesce a couple of times per iteration.
if ((i % 2) == 0)
{
//self.quiesce(); // without brute force
}
}
// remove our worker
--g_contrived_shutdown;
}
// Holds user items in a stack
struct pending_reap
{
unode* m_head;
unode* m_tail;
pending_reap() : m_head(nullptr), m_tail(nullptr) {}
// push items
void push(unode* head, unode* tail)
{
if (!m_head)
{
m_head = head;
m_tail = tail;
}
else
{
tail->m_next.store(m_head, CT_MB_RLX);
m_head = head;
}
}
// dump all
void dump()
{
unode* n = m_head;
m_head = m_tail = nullptr;
while (n)
{
unode* next = n->m_next.load(CT_MB_RLX);
delete n;
n = next;
}
}
};
// Tracks a per_thread quiescence
struct thread_quiesce_node
{
thread_quiesce_node* m_next;
ptex_t::per_thread* m_thread;
thread_quiesce_node(ptex_t::per_thread* thread) :
m_next(nullptr), m_thread(thread) {}
};
// tracks a reapers control of its view of quiescence
struct thread_quiesce
{
thread_quiesce_node* m_head;
thread_quiesce_node* m_ready;
ptex_t& m_ptex;
thread_quiesce(ptex_t& ptex) : m_head(nullptr), m_ready(nullptr), m_ptex(ptex)
{
for (std::size_t i = 0; i < THREADS; ++i)
{
thread_quiesce_node* n =
new thread_quiesce_node(&ptex.m_threads[i]);
n->m_next = m_head;
m_head = n;
}
}
~thread_quiesce()
{
thread_quiesce_node* n = m_head;
// Dump the head
while (n)
{
thread_quiesce_node* next = n->m_next;
delete n;
n = next;
}
// Dump ready... We are at dtor anyway.
n = m_ready;
while (n)
{
thread_quiesce_node* next = n->m_next;
delete n;
n = next;
}
}
// Run a quiescence detection cycle
bool quiesce()
{
// Split sort...
thread_quiesce_node* not_ready = nullptr;
thread_quiesce_node* n = m_head;
// Iterate...
while (n)
{
thread_quiesce_node* next = n->m_next;
if (!n->m_thread->m_quiesce.try_lock($))
{
// NOT ready for the cycle...
n->m_next = not_ready;
not_ready = n;
}
else
{
// PERFECT! We can run full steam ahead. :^)
n->m_thread->m_quiesce.unlock($);
n->m_next = m_ready;
m_ready = n;
}
n = next;
}
m_head = not_ready;
if (m_head) return false;
// BINGO: TARGET ACQUIRED!
m_head = m_ready;
m_ready = nullptr;
return true;
}
};
// Add a reap to a reap...
unode* reap_penders(pending_reap& reap)
{
unode* fhead = g_ptex.reap();
if (fhead)
{
// Find tail
unode* ftail = fhead;
unode* next = ftail->m_next.load(CT_MB_RLX);
while (next)
{
ftail = next;
next = next->m_next.load(CT_MB_RLX);
}
// Push onto fresh queue
reap.push(fhead, ftail);
}
return fhead;
}
// Actually deletes user items
void reaper()
{
ptex_t::per_thread& self = g_ptex.get_ref();
//std::cout << "reaper[" << rl::thread_index() << "]\n";
pending_reap fresh;
pending_reap old;
thread_quiesce tquiesce(g_ptex);
unsigned long qcount = 0;
// Reap Loop
{
unode* fhead = nullptr;
while (g_contrived_shutdown != 0)
{
reap_penders(fresh);
if (tquiesce.quiesce())
{
// The good ol' two step...
pending_reap dumpster = old;
old = fresh;
fresh.m_head = fresh.m_tail = nullptr;
dumpster.dump();
}
rl::yield(1, $);
}
}
// Clean up, for we are fin.
reap_penders(fresh);
g_ptex.quiesce_brute(); // wait for a quiescence point
// Flush... Drain sound... ;^)
fresh.dump();
old.dump();
}
// Direct the work to the correct threads... ;^)
void thread(unsigned int tidx)
{
if (tidx < REAPERS)
{
reaper();
}
else
{
worker();
}
}
};
// Test away... Or fly? Humm...
int main()
{
{
rl::test_params p;
p.iteration_count = 5000000;
//p.execution_depth_limit = 33333;
// p.search_type = rl::sched_bound;
//p.search_type = rl::fair_full_search_scheduler_type;
//p.search_type = rl::fair_context_bound_scheduler_type;
rl::simulate<ct_multex_test>(p);
}
return 0;
}___________________________
Can anybody run it with Relacy? If not, I need to learn why: what problems were encountered?
Thanks.
If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
[...]
___________________________Can anybody run it with Relacy? If not, I need to learn why: what problems were encountered?
On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>>
>> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
Can you give a short executive summary? What are the
advantages/disadvantages/novelty?
>> https://pastebin.com/raw/FpnvTqvM
>> (raw link, no ads! :^)
>>
>> ___________________________
>>
>> [...]
>>
>> ___________________________
>>
>>
>>
>> Can anybody run it with Relacy? If not, I need to learn why: what problems were encountered?
Did you succeed with running it with Relacy? :)
So far, so good. It passes 10000000 iterations. I am letting rl::sched_bound run for around 65 minutes now, no problems at iteration:
99% (129564672/129564673)
99% (129630208/129630209)
99% (129695744/129695745)
99% (129761280/129761281)
99% (129826816/129826817)
99% (129892352/129892353)
99% (129957888/129957889)
99% (130023424/130023425)
[...]
Also, need to get the latest version of Relacy. I am using version: 2.3.
On Wednesday, December 19, 2018 at 2:02:31 AM UTC-8, Dmitry Vyukov wrote:On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>>
>> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
Can you give a short executive summary? What are the
advantages/disadvantages/novelty?Very quick, sorry: Should have some more time tonight.A simple proxy with per-thread mutexes. Threads enter a protected region by taking their own lock, and leave it by releasing said lock. Very basic. When a thread wants to defer a node for deletion it pushes it onto a global lock-free stack via CAS. There is a reaper thread that flushes all of the nodes with XCHG, and keeps it on an internal list. The reaper thread loop then tries to acquire and release all of the per-thread locks. Once it has done this, it says quiescent state. It keeps nodes in a way that ensures at least two periods of this state have occurred before it actually calls delete and destroys memory. Since the reapers use a try_lock to detect a quiescent state, it can livelock in a reaper in the sense that it never gains a lock. However, Relacy should never detect the condition because of the limited iteration loop for workers wrt the test code itself. There is a work around. We can let a reaper fail for a little while before it actually ditches try_lock and just locks the per-thread quiescence lock. It would act just like an adaptive mutex, in a sense...
So far, so good. It passes 10000000 iterations. I am letting rl::sched_bound run for around 65 minutes now, no problems at iteration:
On Wednesday, December 19, 2018 at 3:17:18 PM UTC-8, Chris M. Thomasson wrote:On Wednesday, December 19, 2018 at 2:02:31 AM UTC-8, Dmitry Vyukov wrote:On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>>
>> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
Can you give a short executive summary? What are the
advantages/disadvantages/novelty?Very quick, sorry: Should have some more time tonight.A simple proxy with per-thread mutexes. Threads enter a protected region by taking their own lock, and leave it by releasing said lock. Very basic. When a thread wants to defer a node for deletion it pushes it onto a global lock-free stack via CAS. There is a reaper thread that flushes all of the nodes with XCHG, and keeps it on an internal list. The reaper thread loop then tries to acquire and release all of the per-thread locks. Once it has done this, it says quiescent state. It keeps nodes in a way that ensures at least two periods of this state have occurred before it actually calls delete and destroys memory. Since the reapers use a try_lock to detect a quiescent state, it can livelock in a reaper in the sense that it never gains a lock. However, Relacy should never detect the condition because of the limited iteration loop for workers wrt the test code itself. There is a work around. We can let a reaper fail for a little while before it actually ditches try_lock and just locks the per-thread quiescence lock. It would act just like an adaptive mutex, in a sense...[...]So far, so good. It passes 10000000 iterations. I am letting rl::sched_bound run for around 65 minutes now, no problems at iteration:
Fwiw, its been running overnight using rl:sched_bound, I am at iteration:99% (3506831360/3506831361)99% (3506896896/3506896897)99% (3506962432/3506962433)99% (3507027968/3507027969)99% (3507093504/3507093505)99% (3507159040/3507159041)99% (3507224576/3507224577)99% (3507290112/3507290113)No problems found so far at 3.5 billion iterations.
On Thu, Dec 20, 2018 at 12:17 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Wednesday, December 19, 2018 at 2:02:31 AM UTC-8, Dmitry Vyukov wrote:
>>
>> On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>> >
>> > On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>> >>
>> >> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
>>
>> Can you give a short executive summary? What are the
>> advantages/disadvantages/novelty?
>
>
> Very quick, sorry: Should have some more time tonight.
>
> A simple proxy with per-thread mutexes. Threads enter a protected region by taking their own lock, and leave it by releasing said lock. Very basic. When a thread wants to defer a node for deletion it pushes it onto a global lock-free stack via CAS. There is a reaper thread that flushes all of the nodes with XCHG, and keeps it on an internal list. The reaper thread loop then tries to acquire and release all of the per-thread locks. Once it has done this, it says quiescent state. It keeps nodes in a way that ensures at least two periods of this state have occurred before it actually calls delete and destroys memory. Since the reapers use a try_lock to detect a quiescent state, it can livelock in a reaper in the sense that it never gains a lock. However, Relacy should never detect the condition because of the limited iteration loop for workers wrt the test code itself. There is a work around. We can let a reaper fail for a little while before it actually ditches try_lock and just locks the per-thread quiescence lock. It would act just like an adaptive mutex, in a sense...
So the main advantage is simplicity and ability for mere mortals to
understand how it works :)
Should be especially advantageous in teaching since it does not
involve atomic operations.
[...]