Proxy-collector with atomic-free fast-path

109 views
Skip to first unread message

Dmitriy V'jukov

unread,
Feb 2, 2008, 9:16:22 PM2/2/08
to lock-free
[Cross-post from USENET comp.programming.threads]

Proxy-collector with atomic-free/membar-free fast-path for acquire/
release.

Chris Thomasson's original proxy-collector is algorithm basis:
http://home.comcast.net/~vzoom/demos/pc_sample.c

Instead of collector per user object I use fixed collector count. So
every collector holds multiple user objects. In code below collector
count = 4.

Also I mix in my acquire/release amortization trick:
http://groups.google.ru/group/comp.programming.threads/msg/91d01886ae...
It allows me to achieve atomic-free/membar-free fast-path for acquire/
release.

And finally I add some amortizations like local per thread cache for
deferred objects.

So it must kick ass of any other proxy-collector out there :)
Strictly saying, it's no more pure proxy-collector, it's something
like PC+RCU+SMR :)

Public API:

void pc_init();
void pc_deinit();

void pc_thread_init();
void pc_thread_deinit();

void pc_acquire();
void pc_release();

struct pc_node_t;
void pc_defer(pc_node_t* node);

// this is the only new call
// must be executed before blocking for a long time
// to not prevent memory reclamation
void pc_flush();

Implementation:

#include <windows.h>
#include <intrin.h>
#pragma intrinsic (_InterlockedAnd)

typedef char cacheline_pad_t [128];

unsigned const pc_word_bits = 32;
unsigned const pc_collector_count = 4;
unsigned const pc_collector_bits = 2;
unsigned const pc_counter_inc =
1 << (pc_collector_count + pc_collector_bits);
unsigned const pc_is_current_inc = 1;
unsigned const pc_back_link_inc = 1 << 1;

unsigned const pc_local_garbage_cache = 8;
unsigned const pc_global_garbage_threshold = 32;

struct pc_node_t
{
pc_node_t* pc_next_;
virtual ~pc_node_t() {}

};

struct pc_master_t
{
cacheline_pad_t pad1_;

union union_t
{
long whole_;
struct
{
unsigned current_collector_ :
pc_collector_bits;//lsb
unsigned collector_busy_mask_ :
pc_collector_count;
unsigned outer_counter_ : pc_word_bits
- pc_collector_count -
pc_collector_bits;//msb
};
};

union_t union_;

cacheline_pad_t pad2_;

union
{
long copy_whole_;
struct
{
unsigned current_collector_copy_ :
pc_collector_bits;//lsb
unsigned pad_ : pc_word_bits -
pc_collector_bits;//msb
};
};

};

struct pc_collector_t
{
cacheline_pad_t pad1_;

pc_node_t* defer_list_;
unsigned defer_list_size_;

cacheline_pad_t pad2_;

union
{
long whole_counter_;
struct
{
unsigned is_current_ : 1;//lsb
unsigned back_link_ : 1;
unsigned pad_ : pc_collector_count +
pc_collector_bits - 2;
unsigned inner_counter_ : pc_word_bits
- pc_collector_count -
pc_collector_bits;//msb
};
};

};

struct pc_thread_data_t
{
unsigned recursion_count_;
unsigned is_acquired_;
unsigned collector_index_;
pc_node_t* local_defer_list_;
pc_node_t* local_defer_list_tail_;
unsigned local_defer_list_size_;

};

pc_master_t pc_master;
pc_collector_t pc_collectors [pc_collector_count];
__declspec(thread) pc_thread_data_t pc_thread_data;

unsigned pc_acquire_impl()
{
// load current collector
// and increment outer counter
long const prev =
_InterlockedExchangeAdd(
&pc_master.union_.whole_, pc_counter_inc);
pc_master_t::union_t u = {prev};
return u.current_collector_;

}

void pc_release_impl(unsigned const index, unsigned count)
{
// decrement inner counter
pc_collector_t& collector = pc_collectors[index];
unsigned const prev =
_InterlockedExchangeAdd(
&collector.whole_counter_, -count);
if (0 == prev - count)
{
// delete all nodes from collector
while (collector.defer_list_)
{
pc_node_t* next = collector.defer_list_-
>pc_next_;
delete collector.defer_list_;
collector.defer_list_ = next;
}
collector.defer_list_size_ = 0;

// prepare collector for next run
collector.back_link_ = 1;
collector.is_current_ = 1;

// update collector busy mask
pc_master_t::union_t u;
u.collector_busy_mask_ = 1 << index;
_InterlockedAnd(&pc_master.union_.whole_,
~u.whole_);

// reset back-link
pc_release_impl(
(index + 1) % pc_collector_count,
pc_back_link_inc);
}

}

void pc_flush_impl()
{
// transfer local defer list to collector
pc_thread_data_t& local = pc_thread_data;
pc_collector_t& collector =
pc_collectors[local.collector_index_];

pc_node_t* prev = (pc_node_t*)_InterlockedExchange(
(long*)&collector.defer_list_,
(long)local.local_defer_list_);
local.local_defer_list_tail_->pc_next_ = prev;

unsigned new_count = local.local_defer_list_size_ +

_InterlockedExchangeAdd((long*)&collector.defer_list_size_,
local.local_defer_list_size_);

local.local_defer_list_ = 0;
local.local_defer_list_tail_ = 0;
local.local_defer_list_size_ = 0;

if (pc_global_garbage_threshold < new_count)
{
// trying to shift collector
pc_master_t::union_t cmp;
pc_master_t::union_t val;
do
{
cmp = pc_master.union_;
if (cmp.current_collector_ !=
local.collector_index_)
return;
unsigned next_mask =
(1 << ((cmp.current_collector_ + 1)
% pc_collector_count));
if (cmp.collector_busy_mask_ & next_mask)
return;
val = cmp;
val.collector_busy_mask_ |= next_mask;
val.current_collector_ += 1;
val.outer_counter_ = 0;
}
while (cmp.whole_ != _InterlockedCompareExchange(
(long*)&pc_master.union_.whole_, val.whole_,
cmp.whole_));
// collector is shifted
// increment collector index copy
_InterlockedIncrement((long*)&pc_master.copy_whole_);
// reset current flag and transfer
// outer count to inner counter

_InterlockedExchangeAdd((long*)&collector.whole_counter_,
(cmp.outer_counter_
<< (pc_collector_count +
pc_collector_bits))
- pc_is_current_inc);
}

}

void pc_init()
{
pc_master.union_.current_collector_ = 0;
pc_master.union_.collector_busy_mask_ = 1;
pc_master.union_.outer_counter_ = 0;
pc_master.current_collector_copy_ = 0;
for (unsigned i = 0; i != pc_collector_count; ++i)
{
pc_collectors[i].defer_list_ = 0;
pc_collectors[i].defer_list_size_ = 0;
pc_collectors[i].is_current_ = 1;
pc_collectors[i].back_link_ = 1;
pc_collectors[i].inner_counter_ = 0;
}
pc_collectors[0].back_link_ = 0;

}

void pc_deinit()
{
pc_release_impl(
pc_master.union_.current_collector_,
pc_is_current_inc);

}

void pc_acquire()
{
pc_thread_data_t& local = pc_thread_data;
local.recursion_count_ += 1;
if (1 != local.recursion_count_)
return;
if (local.is_acquired_)
return;
local.collector_index_ = pc_acquire_impl();
local.is_acquired_ = 1;

}

void pc_release()
{
pc_thread_data_t& local = pc_thread_data;
local.recursion_count_ -= 1;
if (0 == local.recursion_count_)
{
if ((pc_master.current_collector_copy_)
!= local.collector_index_)
{
if (local.local_defer_list_size_)
pc_flush_impl();
pc_release_impl(local.collector_index_,
pc_counter_inc);
local.is_acquired_ = 0;
}
}

}

void pc_flush()
{
pc_thread_data_t& local = pc_thread_data;
if (local.recursion_count_)
return;
if (local.is_acquired_)
{
if (local.local_defer_list_size_)
pc_flush_impl();
pc_release_impl(local.collector_index_,
pc_counter_inc);
local.is_acquired_ = 0;
}

}

void pc_defer(pc_node_t* node)
{
pc_thread_data_t& local = pc_thread_data;
node->pc_next_ = local.local_defer_list_;
local.local_defer_list_ = node;
if (0 == local.local_defer_list_tail_)
local.local_defer_list_tail_ = node;
local.local_defer_list_size_ += 1;
if (pc_local_garbage_cache < local.local_defer_list_size_)
pc_flush_impl();

}

void pc_thread_init()
{
pc_thread_data.recursion_count_ = 0;
pc_thread_data.is_acquired_ = 0;
pc_thread_data.collector_index_ = 0;
pc_thread_data.local_defer_list_ = 0;
pc_thread_data.local_defer_list_tail_ = 0;
pc_thread_data.local_defer_list_size_ = 0;

}

void pc_thread_deinit()
{
pc_flush();

}

Dmitriy V'jukov

Dmitriy V'jukov

unread,
Feb 2, 2008, 9:38:25 PM2/2/08
to lock-free
On 3 фев, 05:16, "Dmitriy V'jukov" <dvyu...@gmail.com> wrote:

> Also I mix in my acquire/release amortization trick:http://groups.google.ru/group/comp.programming.threads/msg/91d01886ae...

Working link:
http://groups.google.ru/group/comp.programming.threads/msg/91d01886ae690185

Dmitriy V'jukov
Reply all
Reply to author
Forward
Message has been deleted
0 new messages