// Simple Per-Thread Mutex Based Proxy Collector // For Academic and Experimental things... // Beginner, Moderate? // In Good Ol' Relacy! Nice as always. //_______________________________________________ //#define RL_DEBUGBREAK_ON_ASSERT //#define RL_MSVC_OUTPUT //#define RL_FORCE_SEQ_CST //#define RL_GC #include <relacy/relacy_std.hpp> #include <vector> #include <algorithm> #include <iostream> // Simple macro based redirection of the verobsse std membars. #define CT_MB_ACQ std::memory_order_acquire #define CT_MB_REL std::memory_order_release #define CT_MB_RLX std::memory_order_relaxed #define CT_MB_ACQ_REL std::memory_order_acq_rel #define CT_MB(mp_0) std::atomic_thread_fence(mp_0) // Some global vars directing the show... #define WORKERS 4 #define REAPERS 2 // set it to one for now... #define ITERS 5 #define THREADS (WORKERS + REAPERS) // A User Node struct unode { std::atomic<unode*> m_next; VAR_T(unsigned long) m_data; unode(unsigned long data, unode* next = nullptr) : m_data(data), m_next(next) {} }; // The global ptex state template<std::size_t T_threads> struct ptex { // Per-thread state struct per_thread { std::mutex m_quiesce; // essential for this scheme VAR_T(unsigned long) m_ver_local; // for future use std::atomic<unsigned long> m_ver_reap; // for future use per_thread() : m_ver_local(0), m_ver_reap(0) { //std::cout << "ptex::per_thread::per_thread()\n"; } ~per_thread() throw() { //std::cout << "ptex::per_thread::~per_thread()\n"; } // A thread enters a region void acquire() { unsigned long nver = VAR(m_ver_local)++; m_quiesce.lock($); m_ver_reap.store(nver, CT_MB_RLX); CT_MB(CT_MB_ACQ); } // A thread exits a region void release() { CT_MB(CT_MB_REL); unsigned long ver = VAR(m_ver_local); m_ver_reap.store(ver + 1, CT_MB_RLX); m_quiesce.unlock($); VAR(m_ver_local) = ver + 1; } void quiesce_brute() { // Ensure quiescence m_quiesce.lock($); m_quiesce.unlock($); } }; per_thread m_threads[T_threads]; std::atomic<unode*> m_collected; ptex() : m_collected(nullptr) { //std::cout << "ptex::ptex()\n"; } ~ptex() throw() { RL_ASSERT(!m_collected.load(CT_MB_RLX)); //std::cout << "ptex::~ptex()\n"; } // Gain a ref to our threads per-thread struct per_thread& get_ref() throw() { return m_threads[rl::thread_index()]; } // delete everything... void dump(unode* n) { while (n) { unode* next = n->m_next.load(CT_MB_RLX); delete n; n = next; } } // We are finished, collect the node... void collect(unode* n) throw() { unode* head = m_collected.load(CT_MB_RLX); do { n->m_next.store(head, CT_MB_RLX); CT_MB(CT_MB_REL); // release } while (!m_collected.compare_exchange_weak( head, n, CT_MB_RLX)); } // Flush the stack unode* reap() throw() { unode* head = m_collected.exchange(nullptr, CT_MB_RLX); if (head) CT_MB(CT_MB_ACQ); // acquire return head; } // Wait for a quiesce void quiesce_brute() throw() { for (std::size_t i = 0; i < T_threads; ++i) { per_thread& pt = m_threads[i]; //pt.quiesce(); pt.quiesce_brute(); } } }; // Relacy Multex Test... struct ct_multex_test : rl::test_suite<ct_multex_test, THREADS> { typedef ptex<THREADS> ptex_t; ptex_t g_ptex; // Global ptex std::atomic<unode*> g_worker_stack; // Global User Stack unsigned long g_contrived_shutdown; // Just for shutdown void before() { g_worker_stack.store(nullptr, CT_MB_RLX); g_contrived_shutdown = WORKERS; } void after() { } // Add a user item to the stack void worker_push(unode* n) { unode* head = g_worker_stack.load(CT_MB_RLX); do { n->m_next.store(head, CT_MB_RLX); CT_MB(CT_MB_REL); // release } while (!g_worker_stack.compare_exchange_weak(head, n, CT_MB_RLX)); } // Remove a user item from the stack unode* worker_pop_try() { unode* head = g_worker_stack.load(CT_MB_RLX); unode* next = nullptr; do { if (!head) break; CT_MB(CT_MB_ACQ); // acquire next = head->m_next.load(CT_MB_RLX); // critical load! ;^o } while (!g_worker_stack.compare_exchange_weak(head, next, CT_MB_RLX)); return head; } // A User Worker Example. The main point! void worker() { ptex_t::per_thread& self = g_ptex.get_ref(); //std::cout << "worker[" << rl::thread_index() << "]\n"; // perform some work for (unsigned long i = 0; i < ITERS; ++i) { // Push a node unode* n = new unode(rl::thread_index()); worker_push(n); // Pop a node unode* nx = nullptr; for (;;) { self.acquire(); // Per thread acquire nx = worker_pop_try(); // In critical region if (nx) { unsigned long data = VAR(nx->m_data); //std::cout << data << "\n"; RL_ASSERT(data < THREADS); } self.release(); // Per thread acquire if (nx) break; rl::yield(1, $); } // Collect a node g_ptex.collect(nx); // We are fin with nx // Quiesce a couple of times per iteration. if ((i % 2) == 0) { //self.quiesce(); // without brute force } } // remove our worker --g_contrived_shutdown; } // Holds user items in a stack struct pending_reap { unode* m_head; unode* m_tail; pending_reap() : m_head(nullptr), m_tail(nullptr) {} // push items void push(unode* head, unode* tail) { if (!m_head) { m_head = head; m_tail = tail; } else { tail->m_next.store(m_head, CT_MB_RLX); m_head = head; } } // dump all void dump() { unode* n = m_head; m_head = m_tail = nullptr; while (n) { unode* next = n->m_next.load(CT_MB_RLX); delete n; n = next; } } }; // Tracks a per_thread quiescence struct thread_quiesce_node { thread_quiesce_node* m_next; ptex_t::per_thread* m_thread; thread_quiesce_node(ptex_t::per_thread* thread) : m_next(nullptr), m_thread(thread) {} }; // tracks a reapers control of its view of quiescence struct thread_quiesce { thread_quiesce_node* m_head; thread_quiesce_node* m_ready; ptex_t& m_ptex; thread_quiesce(ptex_t& ptex) : m_head(nullptr), m_ready(nullptr), m_ptex(ptex) { for (std::size_t i = 0; i < THREADS; ++i) { thread_quiesce_node* n = new thread_quiesce_node(&ptex.m_threads[i]); n->m_next = m_head; m_head = n; } } ~thread_quiesce() { thread_quiesce_node* n = m_head; // Dump the head while (n) { thread_quiesce_node* next = n->m_next; delete n; n = next; } // Dump ready... We are at dtor anyway. n = m_ready; while (n) { thread_quiesce_node* next = n->m_next; delete n; n = next; } } // Run a quiescence detection cycle bool quiesce() { // Split sort... thread_quiesce_node* not_ready = nullptr; thread_quiesce_node* n = m_head; // Iterate... while (n) { thread_quiesce_node* next = n->m_next; if (!n->m_thread->m_quiesce.try_lock($)) { // NOT ready for the cycle... n->m_next = not_ready; not_ready = n; } else { // PERFECT! We can run full steam ahead. :^) n->m_thread->m_quiesce.unlock($); n->m_next = m_ready; m_ready = n; } n = next; } m_head = not_ready; if (m_head) return false; // BINGO: TARGET ACQUIRED! m_head = m_ready; m_ready = nullptr; return true; } }; // Add a reap to a reap... unode* reap_penders(pending_reap& reap) { unode* fhead = g_ptex.reap(); if (fhead) { // Find tail unode* ftail = fhead; unode* next = ftail->m_next.load(CT_MB_RLX); while (next) { ftail = next; next = next->m_next.load(CT_MB_RLX); } // Push onto fresh queue reap.push(fhead, ftail); } return fhead; } // Actually deletes user items void reaper() { ptex_t::per_thread& self = g_ptex.get_ref(); //std::cout << "reaper[" << rl::thread_index() << "]\n"; pending_reap fresh; pending_reap old; thread_quiesce tquiesce(g_ptex); unsigned long qcount = 0; // Reap Loop { unode* fhead = nullptr; while (g_contrived_shutdown != 0) { reap_penders(fresh); if (tquiesce.quiesce()) { // The good ol' two step... pending_reap dumpster = old; old = fresh; fresh.m_head = fresh.m_tail = nullptr; dumpster.dump(); } rl::yield(1, $); } } // Clean up, for we are fin. reap_penders(fresh); g_ptex.quiesce_brute(); // wait for a quiescence point // Flush... Drain sound... ;^) fresh.dump(); old.dump(); } // Direct the work to the correct threads... ;^) void thread(unsigned int tidx) { if (tidx < REAPERS) { reaper(); } else { worker(); } } }; // Test away... Or fly? Humm... int main() { { rl::test_params p; p.iteration_count = 5000000; //p.execution_depth_limit = 33333; // p.search_type = rl::sched_bound; //p.search_type = rl::fair_full_search_scheduler_type; //p.search_type = rl::fair_context_bound_scheduler_type; rl::simulate<ct_multex_test>(p); } return 0; }
___________________________
Can anybody run it with Relacy? If not, I need to learn why: what problems were encountered?
Thanks.
If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
[...]
___________________________Can anybody run it with Relacy? If not, I need to learn why: what problems were encountered?
On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>>
>> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
Can you give a short executive summary? What are the
advantages/disadvantages/novelty?
>> https://pastebin.com/raw/FpnvTqvM
>> (raw link, no ads! :^)
>>
>> ___________________________
>>
>> [...]
>>
>> ___________________________
>>
>>
>>
>> Can anybody run it with Relacy? If not, I need to learn why: what problems were encountered?
Did you succeed with running it with Relacy? :)
So far, so good. It passes 10000000 iterations. I am letting rl::sched_bound run for around 65 minutes now, no problems at iteration:
99% (129564672/129564673)
99% (129630208/129630209)
99% (129695744/129695745)
99% (129761280/129761281)
99% (129826816/129826817)
99% (129892352/129892353)
99% (129957888/129957889)
99% (130023424/130023425)
[...]
Also, need to get the latest version of Relacy. I am using version: 2.3.
On Wednesday, December 19, 2018 at 2:02:31 AM UTC-8, Dmitry Vyukov wrote:On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>>
>> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
Can you give a short executive summary? What are the
advantages/disadvantages/novelty?Very quick, sorry: Should have some more time tonight.A simple proxy with per-thread mutexes. Threads enter a protected region by taking their own lock, and leave it by releasing said lock. Very basic. When a thread wants to defer a node for deletion it pushes it onto a global lock-free stack via CAS. There is a reaper thread that flushes all of the nodes with XCHG, and keeps it on an internal list. The reaper thread loop then tries to acquire and release all of the per-thread locks. Once it has done this, it says quiescent state. It keeps nodes in a way that ensures at least two periods of this state have occurred before it actually calls delete and destroys memory. Since the reapers use a try_lock to detect a quiescent state, it can livelock in a reaper in the sense that it never gains a lock. However, Relacy should never detect the condition because of the limited iteration loop for workers wrt the test code itself. There is a work around. We can let a reaper fail for a little while before it actually ditches try_lock and just locks the per-thread quiescence lock. It would act just like an adaptive mutex, in a sense...
So far, so good. It passes 10000000 iterations. I am letting rl::sched_bound run for around 65 minutes now, no problems at iteration:
On Wednesday, December 19, 2018 at 3:17:18 PM UTC-8, Chris M. Thomasson wrote:On Wednesday, December 19, 2018 at 2:02:31 AM UTC-8, Dmitry Vyukov wrote:On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>>
>> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
Can you give a short executive summary? What are the
advantages/disadvantages/novelty?Very quick, sorry: Should have some more time tonight.A simple proxy with per-thread mutexes. Threads enter a protected region by taking their own lock, and leave it by releasing said lock. Very basic. When a thread wants to defer a node for deletion it pushes it onto a global lock-free stack via CAS. There is a reaper thread that flushes all of the nodes with XCHG, and keeps it on an internal list. The reaper thread loop then tries to acquire and release all of the per-thread locks. Once it has done this, it says quiescent state. It keeps nodes in a way that ensures at least two periods of this state have occurred before it actually calls delete and destroys memory. Since the reapers use a try_lock to detect a quiescent state, it can livelock in a reaper in the sense that it never gains a lock. However, Relacy should never detect the condition because of the limited iteration loop for workers wrt the test code itself. There is a work around. We can let a reaper fail for a little while before it actually ditches try_lock and just locks the per-thread quiescence lock. It would act just like an adaptive mutex, in a sense...[...]So far, so good. It passes 10000000 iterations. I am letting rl::sched_bound run for around 65 minutes now, no problems at iteration:
Fwiw, its been running overnight using rl:sched_bound, I am at iteration:99% (3506831360/3506831361)99% (3506896896/3506896897)99% (3506962432/3506962433)99% (3507027968/3507027969)99% (3507093504/3507093505)99% (3507159040/3507159041)99% (3507224576/3507224577)99% (3507290112/3507290113)No problems found so far at 3.5 billion iterations.
On Thu, Dec 20, 2018 at 12:17 AM Chris M. Thomasson <cri...@charter.net> wrote:
>
> On Wednesday, December 19, 2018 at 2:02:31 AM UTC-8, Dmitry Vyukov wrote:
>>
>> On Wed, Dec 19, 2018 at 7:05 AM Chris M. Thomasson <cri...@charter.net> wrote:
>> >
>> > On Monday, December 17, 2018 at 11:23:20 PM UTC-8, Chris M. Thomasson wrote:
>> >>
>> >> If interested, I can give more details. For now, here is the code in the form of a Relacy Test Unit:
>>
>> Can you give a short executive summary? What are the
>> advantages/disadvantages/novelty?
>
>
> Very quick, sorry: Should have some more time tonight.
>
> A simple proxy with per-thread mutexes. Threads enter a protected region by taking their own lock, and leave it by releasing said lock. Very basic. When a thread wants to defer a node for deletion it pushes it onto a global lock-free stack via CAS. There is a reaper thread that flushes all of the nodes with XCHG, and keeps it on an internal list. The reaper thread loop then tries to acquire and release all of the per-thread locks. Once it has done this, it says quiescent state. It keeps nodes in a way that ensures at least two periods of this state have occurred before it actually calls delete and destroys memory. Since the reapers use a try_lock to detect a quiescent state, it can livelock in a reaper in the sense that it never gains a lock. However, Relacy should never detect the condition because of the limited iteration loop for workers wrt the test code itself. There is a work around. We can let a reaper fail for a little while before it actually ditches try_lock and just locks the per-thread quiescence lock. It would act just like an adaptive mutex, in a sense...
So the main advantage is simplicity and ability for mere mortals to
understand how it works :)
Should be especially advantageous in teaching since it does not
involve atomic operations.
[...]