On 1/27/2021 9:23 PM, Chris M. Thomasson wrote:
[...]
I ported one of my proxy collectors from Relacy to C++17, however the
impl code does not create any threads yet. This is a basic mock up.
https://pastebin.com/raw/p1E9WN5i
I will be showing how to use this code to create a poor mans RCU that
can beat read write locks.
Version -1, I am working on version 1 that will simulate a RCU like
scenario. Think of a dynamic lock-free stack, that requires no DWCAS,
and can allow reader threads to iterate through it as writer threads are
mutating it. A writer can defer a node for destruction, and when it
finally gets deleted, well, we are sure that no other thread is
accessing it. Just like RCU.
Code, can you get it compile on your end?
I am currently making sure everything is aligned and padded, and adding
in threads with a reader writer test.
_____________________________
#include <iostream>
#include <atomic>
#include <thread>
#include <cstdlib>
#include <cstdint>
// Some debug/sanity check things...
// Need to make this conditional in compilation with some macros
static std::atomic<unsigned long> g_debug_node_allocations(0);
static std::atomic<unsigned long> g_debug_node_deallocations(0);
// Need to align and pad data structures! To do...
struct ct_node
{
std::atomic<ct_node*> m_next;
ct_node* m_defer_next;
ct_node()
{
g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
}
~ct_node()
{
g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
}
};
template<std::size_t T_defer_limit>
class ct_proxy
{
static void prv_destroy(ct_node* n)
{
while (n)
{
ct_node* next = n->m_defer_next;
delete n;
n = next;
}
}
public:
class collector
{
friend class ct_proxy;
private:
std::atomic<ct_node*> m_defer;
std::atomic<unsigned int> m_defer_count;
std::atomic<unsigned int> m_count;
public:
collector()
: m_defer(nullptr),
m_defer_count(0),
m_count(0)
{
}
~collector()
{
prv_destroy(m_defer.load(std::memory_order_relaxed));
}
};
private:
std::atomic<unsigned int> m_current;
std::atomic<bool> m_quiesce;
ct_node* m_defer;
collector m_collectors[2];
public:
ct_proxy()
: m_current(0),
m_quiesce(false),
m_defer(nullptr)
{
}
~ct_proxy()
{
prv_destroy(m_defer);
}
private:
void prv_quiesce_begin()
{
// Try to begin the quiescence process.
if (!m_quiesce.exchange(true, std::memory_order_acquire))
{
// advance the current collector and grab the old one.
unsigned int old =
m_current.load(std::memory_order_relaxed) & 0xFU;
old = m_current.exchange((old + 1) & 1,
std::memory_order_acq_rel);
collector& c = m_collectors[old & 0xFU];
// decode reference count.
//unsigned int refs = old & 0xFFFFFFF0U; HOLY SHIT!!!
long refs = old & 0xFFFFFFF0U;
// verify reference count and previous collector index.
// RL_ASSERT(!(refs & 0x10U) && (old & 0xFU) == (&c -
m_collectors));
// increment and generate an odd reference count.
if (c.m_count.fetch_add(refs + 0x10U,
std::memory_order_release) == (unsigned int)-refs)
{
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
}
void prv_quiesce_complete(collector& c)
{
// the collector `c' is now in a quiescent state! :^)
std::atomic_thread_fence(std::memory_order_acquire);
// maintain the back link and obtain "fresh" objects from
// this collection.
ct_node* n = m_defer;
m_defer = c.m_defer.load(std::memory_order_relaxed);
c.m_defer.store(0, std::memory_order_relaxed);
// verify and reset the reference count.
//RL_ASSERT(c.m_count.load(std::memory_order_relaxed) == 0x10U);
c.m_count.store(0, std::memory_order_relaxed);
c.m_defer_count.store(0, std::memory_order_relaxed);
// release the quiesce lock.
m_quiesce.store(false, std::memory_order_release);
// destroy nodes.
prv_destroy(n);
}
public:
collector& acquire()
{
// increment the master count _and_ obtain current collector.
unsigned int current =
m_current.fetch_add(0x20U, std::memory_order_acquire);
// decode the collector index.
return m_collectors[current & 0xFU];
}
void release(collector& c)
{
// decrement the collector.
unsigned int count =
c.m_count.fetch_sub(0x20U, std::memory_order_release);
// check for the completion of the quiescence process.
if ((count & 0xFFFFFFF0U) == 0x30U)
{
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
collector& sync(collector& c)
{
// check if the `c' is in the middle of a quiescence process.
if (c.m_count.load(std::memory_order_relaxed) & 0x10U)
{
// drop `c' and get the next collector.
release(c);
return acquire();
}
return c;
}
void collect()
{
prv_quiesce_begin();
}
void collect(collector& c, ct_node* n)
{
if (!n) return;
// link node into the defer list.
ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
n->m_defer_next = prev;
// bump the defer count and begin quiescence process if over
// the limit.
unsigned int count =
c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
if (count >= (T_defer_limit / 2))
{
prv_quiesce_begin();
}
}
};
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course! ;^)
class ct_stack
{
std::atomic<ct_node*> m_head;
public:
ct_stack()
: m_head(nullptr)
{
}
public:
void push(ct_node* n)
{
ct_node* head = m_head.load(std::memory_order_relaxed);
do
{
n->m_next.store(head, std::memory_order_relaxed);
}
while (!m_head.compare_exchange_weak(
head,
n,
std::memory_order_release));
}
ct_node* flush()
{
return m_head.exchange(NULL, std::memory_order_acquire);
}
ct_node* get_head()
{
return m_head.load(std::memory_order_acquire);
}
ct_node* pop()
{
ct_node* head = m_head.load(std::memory_order_acquire);
ct_node* xchg;
do
{
if (!head) return NULL;
xchg = head->m_next.load(std::memory_order_relaxed);
}
while (!m_head.compare_exchange_weak(
head,
xchg,
std::memory_order_acquire));
return head;
}
};
int main()
{
std::cout << "Hello World!\n\n";
std::cout << "Chris M. Thom:assons Proxy Collector Port ver -1...
lol ;^D\n\n";
{
typedef ct_proxy<5> proxy;
proxy gc;
gc.collect();
{
proxy::collector& c = gc.acquire();
gc.collect(c, new ct_node);
gc.collect(c, new ct_node);
gc.collect();
gc.collect(c, new ct_node);
gc.collect(c, new ct_node);
gc.collect(c, new ct_node);
gc.release(c);
}
{
proxy::collector& c = gc.acquire();
gc.release(c);
}
{
proxy::collector& c = gc.acquire();
gc.collect(c, new ct_node);
gc.collect(c, new ct_node);
gc.collect();
gc.collect(c, new ct_node);
gc.collect();
gc.collect(c, new ct_node);
gc.collect(c, new ct_node);
gc.collect(c, new ct_node);
gc.release(c);
}
gc.collect();
gc.collect();
}
{
unsigned long node_allocations =
g_debug_node_allocations.load(std::memory_order_relaxed);
unsigned long node_deallocations =
g_debug_node_deallocations.load(std::memory_order_relaxed);
std::cout << "node_allocations = " << node_allocations << "\n";
std::cout << "node_deallocations = " << node_deallocations << "\n";
if (node_allocations != node_deallocations)
{
std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " <<
node_allocations - node_deallocations << "\n\n";
}
}
return 0;
}
_____________________________