Fwiw, since I have been working on fractals so much, I was wondering if
creating a C++11 implementation of a proxy collector of mine would be as
easy as I did it in the past. For some more information please read here:
https://groups.google.com/d/topic/lock-free/X3fuuXknQF0/discussion
Think of user-space RCU
https://en.wikipedia.org/wiki/Read-copy-update
Here is my C++11 code, it should compile and run on 32-bit x86 with
support for DWCAS. Well, it should run on any other arch with support
for lock-free DWCAS. In other words, the following struct needs to be at
least lock-free atomic wrt std::atomic<T>:
_______________________
struct anchor
{
std::intptr_t count;
node* head;
};
_______________________
http://en.cppreference.com/w/cpp/atomic/atomic/is_lock_free
Has to return true. The lock-free part is the DWCAS in the acquire function.
Here is my C++11 code [1]:
https://pastebin.com/raw/KAt4nhCj
The core algorithm has been verified with Relacy Race Detector:
https://groups.google.com/d/topic/lock-free/QuLBH87z6B4/discussion
I am currently writing up how this can be useful, sorry about that.
Basically, its like RCU where reader threads can read a data-structure
concurrently along with writer threads mutating it. Writer threads can
collect objects while reader threads are accessing them. Every thing is
accounted for using differential counting.
I can get the code to compile and run on 32-bit x86.
Can anybody else get it to compile and/or run?
Thanks.
[1] code:
___________________________
/* Simple Proxy Collector (DWCAS) - C++11 Version
http://www.1024cores.net/home/relacy-race-detector
Copyright 3/27/2018
___________________________________________________*/
#if defined (_MSC_VER)
# define _ENABLE_ATOMIC_ALIGNMENT_FIX // for dwcas
#endif
#include <atomic>
#include <mutex>
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <cassert>
static std::atomic<unsigned int> g_allocs(0);
/* Utilities
___________________________________________________*/
void ct_cout(
const char* msg
) {
static std::mutex g_cout_lock;
g_cout_lock.lock();
std::printf(msg);
std::fflush(stdout);
g_cout_lock.unlock();
}
/* Member Fence Abstraction
___________________________________________________*/
#define ct_mb_relaxed std::memory_order_relaxed
#define ct_mb_acquire std::memory_order_acquire
#define ct_mb_release std::memory_order_release
#define ct_mb_acq_rel std::memory_order_acq_rel
#define ct_mb_seq_cst std::memory_order_seq_cst
#define ct_mb_fence(mb_membar) std::atomic_thread_fence(mb_membar)
namespace ct {
namespace proxy {
// User object base
struct object
{
virtual ~object() {}
};
// Proxy node
struct node
{
std::atomic<std::intptr_t> count;
node* next;
object* obj;
node(node const& n)
: count(n.count.load(ct_mb_relaxed)), next(n.next), obj(n.obj)
{
}
node& operator =(node const& n)
{
count.store(n.count.load(ct_mb_relaxed));
next = n.next;
obj = n.obj;
return *this;
}
node(std::intptr_t count_, node* next_, object* obj_)
: count(count_), next(next_), obj(obj_)
{
g_allocs.fetch_add(1, ct_mb_relaxed);
}
~node()
{
g_allocs.fetch_sub(1, ct_mb_relaxed);
}
};
// DWCAS target
struct anchor
{
std::intptr_t count;
node* head;
};
// Proxy Collector
struct gc
{
std::atomic<anchor> head;
gc() : head(anchor { 0, new node(0, nullptr, nullptr) }) {}
~gc()
{
anchor cmp = head.load(ct_mb_relaxed);
assert(cmp.count > -1);
assert(cmp.head->next == nullptr);
prv_dtor(cmp.head);
}
// Release a reference
bool prv_release(node* n)
{
std::intptr_t count = n->count.fetch_sub(2, ct_mb_relaxed);
if (count == 3) return true;
return false;
}
// Destroy a node
void prv_dtor(node* n)
{
object* obj = n->obj;
if (obj != nullptr) delete obj;
delete n;
}
// Dump backlinks
void prv_dump(node* n)
{
node* cur = n->next;
n->next = nullptr;
// Release
while (prv_release(cur))
{
ct_mb_fence(ct_mb_acquire);
node* next = cur->next;
cur->next = n;
n = cur;
cur = next;
}
// Destroy
while (n)
{
node* next = n->next;
prv_dtor(n);
n = next;
}
}
// Collect a node
void prv_collect(node* n)
{
anchor xchg = { 0, n };
ct_mb_fence(ct_mb_release);
anchor cmp = head.exchange(xchg, ct_mb_relaxed);
ct_mb_fence(ct_mb_acquire);
cmp.head->next = xchg.head;
ct_mb_fence(ct_mb_release);
std::intptr_t count = cmp.head->count.fetch_add(cmp.count +
1, ct_mb_relaxed);
if (count + cmp.count == 0)
{
prv_dump(cmp.head);
}
}
// Acquire a node
node* acquire()
{
anchor cmp = head.load(ct_mb_relaxed);
anchor xchg = { cmp.count + 2, cmp.head };
while (! head.compare_exchange_weak(cmp, xchg, ct_mb_relaxed))
{
xchg = { cmp.count + 2, cmp.head };
}
ct_mb_fence(ct_mb_acquire);
return cmp.head;
}
// Release a node
void release(node* n)
{
ct_mb_fence(ct_mb_release);
if (prv_release(n))
{
ct_mb_fence(ct_mb_acquire);
prv_dump(n);
}
}
// Collect an object
void collect(object* obj)
{
prv_collect(new node(2, nullptr, obj));
}
};
}}
// Test object
struct foo : public ct::proxy::object
{
std::atomic<foo*> next;
foo(foo* next_ = nullptr) : next(next_)
{
g_allocs.fetch_add(1, ct_mb_relaxed);
}
virtual ~foo()
{
g_allocs.fetch_sub(1, ct_mb_relaxed);
}
};
// An atomic LIFO of test objects
struct foo_lifo
{
std::atomic<foo*> head;
foo_lifo(foo* next_ = nullptr) : head(next_) { }
void push(foo* n)
{
foo* cmp = head.load(ct_mb_relaxed);
do {
n->next.store(cmp, ct_mb_relaxed);
ct_mb_fence(ct_mb_release);
} while (! head.compare_exchange_weak(cmp, n, ct_mb_relaxed));
}
// Flush all items, and return LIFO list or nullptr.
foo* flush()
{
foo* cmp = head.exchange(nullptr, ct_mb_relaxed);
if (cmp) ct_mb_fence(ct_mb_acquire);
return cmp;
}
};
/* Test program
____________________________________________ */
#include <thread>
#include <functional>
#define READERS 7
#define WRITERS 5
#define ITERS 1000000
#define THREADS (READERS + WRITERS)
// Our shared state
struct ct_test_state
{
ct::proxy::gc gc;
foo_lifo lifo;
ct_test_state() : gc(), lifo() {}
};
// Read the tstate.lifo shared data-structure.
void ct_reader_thread(
ct_test_state& tstate
) {
ct_cout("ct_reader_thread: Enter\n");
// Read...
for (unsigned int i = 0; i < ITERS * 2; ++i)
{
// Acquire proxy
ct::proxy::node* pcn = tstate.gc.acquire();
// Iterate list
{
foo* cur = tstate.lifo.head.load(ct_mb_relaxed);
ct_mb_fence(ct_mb_acquire);
while (cur)
{
foo* next = cur->next.load(ct_mb_relaxed);
std::this_thread::yield();
cur = next;
}
}
// Release proxy
tstate.gc.release(pcn);
if (! (i % (ITERS / 64)))
{
ct_cout("ct_reader_thread: Processing...\n");
}
}
ct_cout("ct_reader_thread: Exit\n");
}
// Mutate the tstate.lifo shared data-structure.
void ct_writer_thread(
ct_test_state& tstate
) {
ct_cout("ct_writer_thread: Enter\n");
// Create and collect some nodes
for (unsigned int i = 0; i < ITERS; ++i)
{
// Create
tstate.lifo.push(new foo(nullptr));
std::this_thread::yield();
tstate.lifo.push(new foo(nullptr));
std::this_thread::yield();
tstate.lifo.push(new foo(nullptr));
std::this_thread::yield();
// Collect
foo* cur = tstate.lifo.flush();
while (cur)
{
std::this_thread::yield();
foo* next = cur->next.load(ct_mb_relaxed);
tstate.gc.collect(cur);
std::this_thread::yield();
cur = next;
}
if (!(i % (ITERS / 64)))
{
ct_cout("ct_writer_thread: Processing...\n");
}
}
ct_cout("ct_writer_thread: Exit\n");
}
int main()
{
{
ct_test_state tstate;
std::thread threads[THREADS];
// Reader threads
for (unsigned int i = 0; i < READERS; ++i)
{
threads[i] = std::thread(ct_reader_thread, std::ref(tstate));
}
// Writer threads
for (unsigned int i = READERS; i < THREADS; ++i)
{
threads[i] = std::thread(ct_writer_thread, std::ref(tstate));
}
// Join threads
for (unsigned int i = 0; i < THREADS; ++i)
{
threads[i].join();
}
}
// Check for memory leaks
if (g_allocs.load(ct_mb_relaxed) != 0)
{
std::printf("MEMORY LEAK!\n");
assert(false);
}
std::printf("\n\nFin!\n");
std::fflush(stdout);
std::getchar();
return EXIT_SUCCESS;
}
___________________________