Scalable distributed reference counting

200 views
Skip to first unread message

Dmitriy V'jukov

unread,
Feb 14, 2008, 11:42:05 AM2/14/08
to lock-free
Scalable distributed reference counting with lightweight acquire/
release operations.

It supports:
- Reference counting with basic-thread safety
- Reference counting with strong-thread safety
- Reference counting with cross-thread references
- Ultra low overhead PDR
- PDR with long-term cross-thread references

Assumptions about environment I made.
- Fixed number of threads. I made this assumption partly for
simplicity and partly because I'm interested in such environment. This
can be overcome with addition of thread registration mechanism.
- Fixed maximum number of reference counted objects. Solely for
simplicity. Dynamic allocation of helper structures must be added.
- User threads must periodically execute special function. I made this
assumption partly for simplicity and partly because I'm interested in
such environment. This can be overcome if call to this special
function will be inserted into acquire/release operations. Or with
some other mechanisms like signals/APC etc.

Here is public API:

typedef void(*rcx_dtor_fp_t)(void*);
typedef unsigned rcx_t [4];

int rcx_sys_init(unsigned thread_count);
void rcx_sys_deinit();

int rcx_sys_thread_init(unsigned thread_idx);
void rcx_sys_thread_deinit();

// must be periodically executed by all user threads
void rcx_process();
// voluntarily initiate garbage collection (optional)
void rcx_collect();

int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state);
void rcx_destroy(rcx_t* rcx);
void* rcx_get_state(rcx_t* rcx);

void rcx_acquire(rcx_t* rcx);
void rcx_release(rcx_t* rcx);

Here is implementation:

#include "rcx.h"
#include <stdlib.h>

typedef char rcx_cacheline_pad_t [128];
static unsigned const rcx_max_object_count = 1024;
static unsigned const rcx_activity_acq = 1;
static unsigned const rcx_activity_rel = 1024;
static unsigned const rcx_activity_rel_new = 16 * 1024;
static unsigned const rcx_activity_threshold = 128 * 16 * 1024;
static unsigned const rcx_in_list_flag = 1u << 31;

#define FULL_FENCE() __asm mfence

static void rcx_process_epoch();

// decoding of rcx_t
typedef struct rcx_int_s
{
// index of per thread and global object descriptor
unsigned idx_;
// index of owner thread
unsigned thread_idx_;
// user supplied dtor and state
rcx_dtor_fp_t dtor_fp_;
void* state_;
}
rcx_int_t;

// per thread object descriptor
typedef struct rcx_obj_data_s
{
// cache for thread's acquire/release operations
// high bit used as flag
unsigned rc_;
}
rcx_obj_data_t;

// global object descriptor
typedef struct rcx_gl_obj_data_s
{
// pointer to owning rcx_t object
rcx_int_t* rcx_;
// global reference count
unsigned rc_;
// double-linked list of object descriptors
// for which rc drops to zero
struct rcx_gl_obj_data_s* volatile next_;
struct rcx_gl_obj_data_s* volatile prev_;
}
rcx_gl_obj_data_t;

// per thread data
typedef struct rcx_thread_s
{
// index [0 .. thread_count-1]
unsigned index_;
// list of free object descriptors
rcx_gl_obj_data_t* volatile freelist_head_;
rcx_gl_obj_data_t* freelist_tail_;
// cache of foreign free object descriptors
rcx_gl_obj_data_t* freelist_cache_;
// array of per thread object descriptors
rcx_obj_data_t* obj_data_;
// array of object descriptors
// for which acquire/release operations
// was executed in current epoch
rcx_int_t** rc_list_;
// list of object descriptors
// for which rc drops to zero
rcx_gl_obj_data_t* dtor_list_;
// head and tail nodes for dtor_list_
rcx_gl_obj_data_t dtor_fake_ [2];
rcx_cacheline_pad_t pad_;
}
rcx_thread_t;

// per thread data stored in tls
typedef struct rcx_thread_local_s
{
// copy of rcx_thread_t::obj_data_
rcx_obj_data_t* obj_data_;
// copy of rcx_thread_t::rc_list_
rcx_int_t** rc_list_;
// estimation of 'activity' produced by thread
// used for initiation of epoch shift
unsigned activity_;
// current position in rc_list_
unsigned rc_list_pos_;
// copy of rcx_thread_t::index_
unsigned index_;
}
rcx_thread_local_t;

// global data
typedef struct rcx_global_s
{
// array of global object descriptors
rcx_gl_obj_data_t* gl_obj_data_;
// array of per thread data
rcx_thread_t* threads_;
// thread count
unsigned thread_count_;
// flag that some thread
// wants to make epoch shift
unsigned volatile epoch_pending_;
// index of thread which allowed
// to copy local counters to global counters
unsigned volatile epoch_order_;
}
rcx_global_t;

static rcx_global_t rcx_global;
static __declspec(thread) rcx_thread_local_t rcx_thread;

int rcx_sys_init(unsigned thread_count)
{
unsigned i, j, first, last;
rcx_global.gl_obj_data_ =
calloc(rcx_max_object_count, sizeof(rcx_gl_obj_data_t));
rcx_global.threads_ =
calloc(thread_count, sizeof(rcx_thread_t));
rcx_global.thread_count_ = thread_count;
rcx_global.epoch_pending_ = 0;
rcx_global.epoch_order_ = thread_count;
for (i = 0; i != rcx_global.thread_count_; ++i)
{
rcx_thread_t* th = &rcx_global.threads_[i];
th->index_ = i;
th->rc_list_ =
calloc(rcx_max_object_count, sizeof(rcx_int_t*));
th->obj_data_ =
calloc(rcx_max_object_count, sizeof(rcx_obj_data_t));
first = i * (rcx_max_object_count / thread_count);
last = (i + 1) * (rcx_max_object_count / thread_count) - 1;
for (j = first; j != last; ++j)
{
rcx_global.gl_obj_data_[j].rcx_ = (rcx_int_t*)j;
rcx_global.gl_obj_data_[j].next_ =
&rcx_global.gl_obj_data_[j + 1];
}
rcx_global.gl_obj_data_[last].rcx_ = (rcx_int_t*)last;
th->freelist_head_ = &rcx_global.gl_obj_data_[last];
th->freelist_tail_ = &rcx_global.gl_obj_data_[first];
th->freelist_cache_ = 0;
th->dtor_list_ = &th->dtor_fake_[0];
th->dtor_fake_[0].next_ = &th->dtor_fake_[1];
th->dtor_fake_[0].prev_ = 0;
th->dtor_fake_[1].next_ = 0;
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0];
}
return 0;
}

void rcx_sys_deinit()
{
// here we must free all survived rcx_t objects
// it's not implemented yet
free(rcx_global.threads_);
free(rcx_global.gl_obj_data_);
}

int rcx_sys_thread_init(unsigned thread_idx)
{
rcx_thread.obj_data_ =
rcx_global.threads_[thread_idx].obj_data_;
rcx_thread.rc_list_ =
rcx_global.threads_[thread_idx].rc_list_;
rcx_thread.rc_list_pos_ = 0;
rcx_thread.activity_ = 0;
rcx_thread.index_ = thread_idx;
return 0;
}

void rcx_sys_thread_deinit()
{
}

int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
// get free object descriptor from per thread queue
rcx_gl_obj_data_t* gl = th->freelist_tail_;
th->freelist_tail_ = th->freelist_tail_->next_;
x->idx_ = (unsigned)gl->rcx_;
x->thread_idx_ = rcx_thread.index_;
x->dtor_fp_ = dtor_fp;
x->state_ = state;
gl->rcx_ = x;
gl->rc_ = 1;
gl->next_ = 0;
gl->prev_ = 0;
return 0;
}

void rcx_destroy(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
rcx_gl_obj_data_t* gl = &rcx_global.gl_obj_data_[x->idx_];
gl->rcx_ = (rcx_int_t*)x->idx_;
gl->rc_ = x->thread_idx_;
// return object descriptor to local cache
gl->next_ = th->freelist_cache_;
th->freelist_cache_ = gl;
}

void rcx_acquire(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_local_t* th = &rcx_thread;
// find per thread object descriptor
rcx_obj_data_t* obj = &th->obj_data_[x->idx_];
// check whether object descriptor already in local list
// of descriptors for which thread have executed
// acquire/release operation
if (obj->rc_)
{
// descriptor already in list
// just increment counter
obj->rc_ += 1;
}
else
{
// descriptor not in list
// increment and mark counter
obj->rc_ += 1 + rcx_in_list_flag;
// add descriptor to list
th->rc_list_[th->rc_list_pos_++] = x;
}
th->activity_ += rcx_activity_acq;
}

void rcx_release(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_local_t* th = &rcx_thread;
rcx_obj_data_t* obj = &th->obj_data_[x->idx_];
if (obj->rc_)
{
obj->rc_ -= 1;
th->activity_ += rcx_activity_rel;
}
else
{
obj->rc_ += -1 + rcx_in_list_flag;
th->rc_list_[th->rc_list_pos_++] = x;
th->activity_ += rcx_activity_rel_new;
}
}

void* rcx_get_state(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
return x->state_;
}

void rcx_process()
{
rcx_thread_local_t* th = &rcx_thread;
if (th->activity_ >= rcx_activity_threshold
&& 0 == rcx_global.epoch_pending_)
{
// we've created enough activity
// initiate epoch shift
rcx_global.epoch_pending_ = 1;
}
if (0 == th->index_
&& rcx_global.epoch_pending_
&& rcx_global.epoch_order_ ==
rcx_global.thread_count_)
{
// thread with index 0
// starts epoch processing
rcx_global.epoch_order_ = 0;
}
if (th->index_ == rcx_global.epoch_order_)
{
// it's my turn - process epoch
rcx_process_epoch();
// notify next thread
rcx_global.epoch_order_ += 1;
if (rcx_global.epoch_order_ ==
rcx_global.thread_count_)
{
rcx_global.epoch_pending_ = 0;
}
}
}

void rcx_collect()
{
if (0 == rcx_global.epoch_pending_)
rcx_global.epoch_pending_ = 1;
rcx_process();
}

void rcx_process_epoch()
{
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
rcx_thread_t* th2;
rcx_gl_obj_data_t* cur;
rcx_gl_obj_data_t* next;
rcx_obj_data_t* loc;
unsigned i;
rcx_int_t* req;

// transfer cached object descriptors to owner thread
while (th->freelist_cache_)
{
cur = th->freelist_cache_;
next = cur->next_;
th2 = &rcx_global.threads_[cur->rc_];
th2->freelist_head_->next_ = cur;
th2->freelist_head_ = cur;
th->freelist_cache_ = next;
}

// execute dtor function for objects
// which are located in dtor list
// for full epoch
cur = th->dtor_list_->next_;
if (cur->next_)
{
while (cur->next_)
{
next = cur->next_;
cur->rc_ = 1;
cur->next_ = 0;
cur->prev_ = 0;
// here object descriptor is ready for reuse
cur->rcx_->dtor_fp_(cur->rcx_->state_);
cur = next;
}
th->dtor_fake_[0].next_ = &th->dtor_fake_[1];
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0];
}

// transfer all acquire/release operations
// from local descriptor to global
for (i = 0; i != rcx_thread.rc_list_pos_; ++i)
{
req = th->rc_list_[i];
// global descriptor
cur = &rcx_global.gl_obj_data_[req->idx_];
// local descriptor
loc = &th->obj_data_[req->idx_];
cur->rc_ += loc->rc_ - rcx_in_list_flag;
loc->rc_ = 0;
if (cur->rc_ && cur->next_)
{
// remove object from dtor list
cur->prev_->next_ = cur->next_;
cur->next_->prev_ = cur->prev_;
cur->next_ = 0;
cur->prev_ = 0;
}
else if (0 == cur->rc_ && 0 == cur->next_)
{
// insert object to dtor list
cur->next_ = th->dtor_list_->next_;
cur->prev_ = th->dtor_list_;
th->dtor_list_->next_->prev_ = cur;
th->dtor_list_->next_ = cur;
}
else if (0 == cur->rc_ && cur->next_)
{
// remove and reinsert object to dtor list
cur->prev_->next_ = cur->next_;
cur->next_->prev_ = cur->prev_;
cur->next_ = th->dtor_list_->next_;
cur->prev_ = th->dtor_list_;
th->dtor_list_->next_->prev_ = cur;
th->dtor_list_->next_ = cur;
}
}

// reset local per-epoch variables
rcx_thread.activity_ = 0;
rcx_thread.rc_list_pos_ = 0;

// to support PDR
FULL_FENCE();
}

Dmitriy V'jukov

Dmitriy V'jukov

unread,
Feb 20, 2008, 2:11:53 PM2/20/08
to lock-free
Reply all
Reply to author
Forward
Message has been deleted
0 new messages