Dmitriy V'jukov
unread,Feb 14, 2008, 11:42:05 AM2/14/08Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to lock-free
Scalable distributed reference counting with lightweight acquire/
release operations.
It supports:
- Reference counting with basic-thread safety
- Reference counting with strong-thread safety
- Reference counting with cross-thread references
- Ultra low overhead PDR
- PDR with long-term cross-thread references
Assumptions about environment I made.
- Fixed number of threads. I made this assumption partly for
simplicity and partly because I'm interested in such environment. This
can be overcome with addition of thread registration mechanism.
- Fixed maximum number of reference counted objects. Solely for
simplicity. Dynamic allocation of helper structures must be added.
- User threads must periodically execute special function. I made this
assumption partly for simplicity and partly because I'm interested in
such environment. This can be overcome if call to this special
function will be inserted into acquire/release operations. Or with
some other mechanisms like signals/APC etc.
Here is public API:
typedef void(*rcx_dtor_fp_t)(void*);
typedef unsigned rcx_t [4];
int rcx_sys_init(unsigned thread_count);
void rcx_sys_deinit();
int rcx_sys_thread_init(unsigned thread_idx);
void rcx_sys_thread_deinit();
// must be periodically executed by all user threads
void rcx_process();
// voluntarily initiate garbage collection (optional)
void rcx_collect();
int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state);
void rcx_destroy(rcx_t* rcx);
void* rcx_get_state(rcx_t* rcx);
void rcx_acquire(rcx_t* rcx);
void rcx_release(rcx_t* rcx);
Here is implementation:
#include "rcx.h"
#include <stdlib.h>
typedef char rcx_cacheline_pad_t [128];
static unsigned const rcx_max_object_count = 1024;
static unsigned const rcx_activity_acq = 1;
static unsigned const rcx_activity_rel = 1024;
static unsigned const rcx_activity_rel_new = 16 * 1024;
static unsigned const rcx_activity_threshold = 128 * 16 * 1024;
static unsigned const rcx_in_list_flag = 1u << 31;
#define FULL_FENCE() __asm mfence
static void rcx_process_epoch();
// decoding of rcx_t
typedef struct rcx_int_s
{
// index of per thread and global object descriptor
unsigned idx_;
// index of owner thread
unsigned thread_idx_;
// user supplied dtor and state
rcx_dtor_fp_t dtor_fp_;
void* state_;
}
rcx_int_t;
// per thread object descriptor
typedef struct rcx_obj_data_s
{
// cache for thread's acquire/release operations
// high bit used as flag
unsigned rc_;
}
rcx_obj_data_t;
// global object descriptor
typedef struct rcx_gl_obj_data_s
{
// pointer to owning rcx_t object
rcx_int_t* rcx_;
// global reference count
unsigned rc_;
// double-linked list of object descriptors
// for which rc drops to zero
struct rcx_gl_obj_data_s* volatile next_;
struct rcx_gl_obj_data_s* volatile prev_;
}
rcx_gl_obj_data_t;
// per thread data
typedef struct rcx_thread_s
{
// index [0 .. thread_count-1]
unsigned index_;
// list of free object descriptors
rcx_gl_obj_data_t* volatile freelist_head_;
rcx_gl_obj_data_t* freelist_tail_;
// cache of foreign free object descriptors
rcx_gl_obj_data_t* freelist_cache_;
// array of per thread object descriptors
rcx_obj_data_t* obj_data_;
// array of object descriptors
// for which acquire/release operations
// was executed in current epoch
rcx_int_t** rc_list_;
// list of object descriptors
// for which rc drops to zero
rcx_gl_obj_data_t* dtor_list_;
// head and tail nodes for dtor_list_
rcx_gl_obj_data_t dtor_fake_ [2];
rcx_cacheline_pad_t pad_;
}
rcx_thread_t;
// per thread data stored in tls
typedef struct rcx_thread_local_s
{
// copy of rcx_thread_t::obj_data_
rcx_obj_data_t* obj_data_;
// copy of rcx_thread_t::rc_list_
rcx_int_t** rc_list_;
// estimation of 'activity' produced by thread
// used for initiation of epoch shift
unsigned activity_;
// current position in rc_list_
unsigned rc_list_pos_;
// copy of rcx_thread_t::index_
unsigned index_;
}
rcx_thread_local_t;
// global data
typedef struct rcx_global_s
{
// array of global object descriptors
rcx_gl_obj_data_t* gl_obj_data_;
// array of per thread data
rcx_thread_t* threads_;
// thread count
unsigned thread_count_;
// flag that some thread
// wants to make epoch shift
unsigned volatile epoch_pending_;
// index of thread which allowed
// to copy local counters to global counters
unsigned volatile epoch_order_;
}
rcx_global_t;
static rcx_global_t rcx_global;
static __declspec(thread) rcx_thread_local_t rcx_thread;
int rcx_sys_init(unsigned thread_count)
{
unsigned i, j, first, last;
rcx_global.gl_obj_data_ =
calloc(rcx_max_object_count, sizeof(rcx_gl_obj_data_t));
rcx_global.threads_ =
calloc(thread_count, sizeof(rcx_thread_t));
rcx_global.thread_count_ = thread_count;
rcx_global.epoch_pending_ = 0;
rcx_global.epoch_order_ = thread_count;
for (i = 0; i != rcx_global.thread_count_; ++i)
{
rcx_thread_t* th = &rcx_global.threads_[i];
th->index_ = i;
th->rc_list_ =
calloc(rcx_max_object_count, sizeof(rcx_int_t*));
th->obj_data_ =
calloc(rcx_max_object_count, sizeof(rcx_obj_data_t));
first = i * (rcx_max_object_count / thread_count);
last = (i + 1) * (rcx_max_object_count / thread_count) - 1;
for (j = first; j != last; ++j)
{
rcx_global.gl_obj_data_[j].rcx_ = (rcx_int_t*)j;
rcx_global.gl_obj_data_[j].next_ =
&rcx_global.gl_obj_data_[j + 1];
}
rcx_global.gl_obj_data_[last].rcx_ = (rcx_int_t*)last;
th->freelist_head_ = &rcx_global.gl_obj_data_[last];
th->freelist_tail_ = &rcx_global.gl_obj_data_[first];
th->freelist_cache_ = 0;
th->dtor_list_ = &th->dtor_fake_[0];
th->dtor_fake_[0].next_ = &th->dtor_fake_[1];
th->dtor_fake_[0].prev_ = 0;
th->dtor_fake_[1].next_ = 0;
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0];
}
return 0;
}
void rcx_sys_deinit()
{
// here we must free all survived rcx_t objects
// it's not implemented yet
free(rcx_global.threads_);
free(rcx_global.gl_obj_data_);
}
int rcx_sys_thread_init(unsigned thread_idx)
{
rcx_thread.obj_data_ =
rcx_global.threads_[thread_idx].obj_data_;
rcx_thread.rc_list_ =
rcx_global.threads_[thread_idx].rc_list_;
rcx_thread.rc_list_pos_ = 0;
rcx_thread.activity_ = 0;
rcx_thread.index_ = thread_idx;
return 0;
}
void rcx_sys_thread_deinit()
{
}
int rcx_create(rcx_t* rcx, rcx_dtor_fp_t dtor_fp, void* state)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
// get free object descriptor from per thread queue
rcx_gl_obj_data_t* gl = th->freelist_tail_;
th->freelist_tail_ = th->freelist_tail_->next_;
x->idx_ = (unsigned)gl->rcx_;
x->thread_idx_ = rcx_thread.index_;
x->dtor_fp_ = dtor_fp;
x->state_ = state;
gl->rcx_ = x;
gl->rc_ = 1;
gl->next_ = 0;
gl->prev_ = 0;
return 0;
}
void rcx_destroy(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
rcx_gl_obj_data_t* gl = &rcx_global.gl_obj_data_[x->idx_];
gl->rcx_ = (rcx_int_t*)x->idx_;
gl->rc_ = x->thread_idx_;
// return object descriptor to local cache
gl->next_ = th->freelist_cache_;
th->freelist_cache_ = gl;
}
void rcx_acquire(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_local_t* th = &rcx_thread;
// find per thread object descriptor
rcx_obj_data_t* obj = &th->obj_data_[x->idx_];
// check whether object descriptor already in local list
// of descriptors for which thread have executed
// acquire/release operation
if (obj->rc_)
{
// descriptor already in list
// just increment counter
obj->rc_ += 1;
}
else
{
// descriptor not in list
// increment and mark counter
obj->rc_ += 1 + rcx_in_list_flag;
// add descriptor to list
th->rc_list_[th->rc_list_pos_++] = x;
}
th->activity_ += rcx_activity_acq;
}
void rcx_release(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
rcx_thread_local_t* th = &rcx_thread;
rcx_obj_data_t* obj = &th->obj_data_[x->idx_];
if (obj->rc_)
{
obj->rc_ -= 1;
th->activity_ += rcx_activity_rel;
}
else
{
obj->rc_ += -1 + rcx_in_list_flag;
th->rc_list_[th->rc_list_pos_++] = x;
th->activity_ += rcx_activity_rel_new;
}
}
void* rcx_get_state(rcx_t* rcx)
{
rcx_int_t* x = (rcx_int_t*)rcx;
return x->state_;
}
void rcx_process()
{
rcx_thread_local_t* th = &rcx_thread;
if (th->activity_ >= rcx_activity_threshold
&& 0 == rcx_global.epoch_pending_)
{
// we've created enough activity
// initiate epoch shift
rcx_global.epoch_pending_ = 1;
}
if (0 == th->index_
&& rcx_global.epoch_pending_
&& rcx_global.epoch_order_ ==
rcx_global.thread_count_)
{
// thread with index 0
// starts epoch processing
rcx_global.epoch_order_ = 0;
}
if (th->index_ == rcx_global.epoch_order_)
{
// it's my turn - process epoch
rcx_process_epoch();
// notify next thread
rcx_global.epoch_order_ += 1;
if (rcx_global.epoch_order_ ==
rcx_global.thread_count_)
{
rcx_global.epoch_pending_ = 0;
}
}
}
void rcx_collect()
{
if (0 == rcx_global.epoch_pending_)
rcx_global.epoch_pending_ = 1;
rcx_process();
}
void rcx_process_epoch()
{
rcx_thread_t* th = &rcx_global.threads_[rcx_thread.index_];
rcx_thread_t* th2;
rcx_gl_obj_data_t* cur;
rcx_gl_obj_data_t* next;
rcx_obj_data_t* loc;
unsigned i;
rcx_int_t* req;
// transfer cached object descriptors to owner thread
while (th->freelist_cache_)
{
cur = th->freelist_cache_;
next = cur->next_;
th2 = &rcx_global.threads_[cur->rc_];
th2->freelist_head_->next_ = cur;
th2->freelist_head_ = cur;
th->freelist_cache_ = next;
}
// execute dtor function for objects
// which are located in dtor list
// for full epoch
cur = th->dtor_list_->next_;
if (cur->next_)
{
while (cur->next_)
{
next = cur->next_;
cur->rc_ = 1;
cur->next_ = 0;
cur->prev_ = 0;
// here object descriptor is ready for reuse
cur->rcx_->dtor_fp_(cur->rcx_->state_);
cur = next;
}
th->dtor_fake_[0].next_ = &th->dtor_fake_[1];
th->dtor_fake_[1].prev_ = &th->dtor_fake_[0];
}
// transfer all acquire/release operations
// from local descriptor to global
for (i = 0; i != rcx_thread.rc_list_pos_; ++i)
{
req = th->rc_list_[i];
// global descriptor
cur = &rcx_global.gl_obj_data_[req->idx_];
// local descriptor
loc = &th->obj_data_[req->idx_];
cur->rc_ += loc->rc_ - rcx_in_list_flag;
loc->rc_ = 0;
if (cur->rc_ && cur->next_)
{
// remove object from dtor list
cur->prev_->next_ = cur->next_;
cur->next_->prev_ = cur->prev_;
cur->next_ = 0;
cur->prev_ = 0;
}
else if (0 == cur->rc_ && 0 == cur->next_)
{
// insert object to dtor list
cur->next_ = th->dtor_list_->next_;
cur->prev_ = th->dtor_list_;
th->dtor_list_->next_->prev_ = cur;
th->dtor_list_->next_ = cur;
}
else if (0 == cur->rc_ && cur->next_)
{
// remove and reinsert object to dtor list
cur->prev_->next_ = cur->next_;
cur->next_->prev_ = cur->prev_;
cur->next_ = th->dtor_list_->next_;
cur->prev_ = th->dtor_list_;
th->dtor_list_->next_->prev_ = cur;
th->dtor_list_->next_ = cur;
}
}
// reset local per-epoch variables
rcx_thread.activity_ = 0;
rcx_thread.rc_list_pos_ = 0;
// to support PDR
FULL_FENCE();
}
Dmitriy V'jukov