Lock-free proxy algorithm (substitution for rw-mutex)

351 views
Skip to first unread message

Dmitriy V'jukov

unread,
May 31, 2007, 3:14:34 PM5/31/07
to lock-free
Another crosspost from comp.programming.threads just to create some
initial content :)

I have the algorithm (and the implementation) of lock-free object
proxy.
What I mean by "object proxy" - object proxy holds some user object
and provides "read" access to it, and provides "replace object"
operation.
It is something that solves problem that reader_writer_mutex solves.
It is analog of http://home.comcast.net/~vzoom/demos/pc_sample.c
But it has some superior properties (of course if algorithm is
working :) You (I) never can be sure with lock-free algorithm :) ).

Properties of the algorithm:
- 64bit clean - no DCAS
- all access (read and write) is _wait-free_ - i.e. no
InterlockedCompareExchage, and no cycles, just InterlockedExchageAdd
and InterlockedExchage
- have the limit of maximum number of concurrent reading threads -
can be varied by changing "number of stolen bits from pointer" - so
can be set to 32/64/128 - I think this is enough... for now...
- can work in two modes: normal and deffered release
- in normal mode user object released immediatly when last reader/
writer stops using it, read access costs 2 interlocked operations - 1
for acquire and 1 for release
- in deffered release mode user object release can be deffered by
inactive reader, read access costs _zero_ interlocked operations
(amortized)
- no collector back-list of "deleted" objects - straitforward
determinate object deletion


Dmitriy V'jukov

Dmitriy V'jukov

unread,
May 31, 2007, 3:15:20 PM5/31/07
to lock-free
Here is the code for normal work mode (not deferred, deferred mode I
post later).

#pragma once

#include <windows.h>
#include <process.h>
#include <stdio.h>

extern "C" long _InterlockedExchange(long*, long);
extern "C" long _InterlockedExchangeAdd(long*, long);
#pragma intrinsic (_InterlockedExchange)
#pragma intrinsic (_InterlockedExchangeAdd)

#define WORD_WIDTH 32
#define STOLEN_BITS 6
#define REMAIN_BITS (WORD_WIDTH - STOLEN_BITS)
#define ALIGNMENT (1 << STOLEN_BITS)
#define TYPE my_data

// test tracer - used as user data
struct my_data
{
int i;

my_data(int i) : i(i)
{
printf("ctor %d\n", i);
Sleep(0);
}

~my_data()
{
printf("dtor %d\n", i);
Sleep(0);
}

};

union inner_union
{
// target for interlocked instructions
long whole;
struct
{
// indicator that this holder object
// was removed from store
unsigned removed : 1;

// not used
unsigned padding : REMAIN_BITS - 1;

// number of readers, that release the object
unsigned counter : STOLEN_BITS;
};

};

union outer_union
{
// target for interlocked instructions
long whole;
struct
{
// pointer to holder instance
// w/o STOLEN_BITS least significant bits
// and shifted right by STOLEN_BITS
unsigned ptr : REMAIN_BITS;

// number of readers, that acquire the object
unsigned counter : STOLEN_BITS;
};

};

struct holder
{
// stored user object
TYPE* user_obj;

// pointer to unaligned buffer that was returned from malloc()
char* buf_ptr;

// remove flag + release counter
inner_union inner;

};

class store
{
// pointer to holder + acquire counter
outer_union outer;

static outer_union create_holder(TYPE* obj)
{
char* p = new char[sizeof(holder) + ALIGNMENT];
char* pa = ((int)p % ALIGNMENT) ? (p - ((int)p %
ALIGNMENT) +
ALIGNMENT) : p;
holder* h = (holder*)pa;
h->user_obj = obj;
h->buf_ptr = p;
h->inner.whole = 0;
unsigned ptr = ((unsigned)pa >> STOLEN_BITS);
outer_union outer;
outer.ptr = ptr;
outer.counter = 0;
return outer;
}

static void replace_holder(outer_union outer)
{
holder* h = (holder*)(outer.ptr << STOLEN_BITS);
inner_union addend;
addend.removed = 1;
addend.counter = outer.counter;
addend.padding = 0;
release_holder(h, addend);
}

static void release_holder(holder* h, inner_union addend)
{
inner_union new_inner;
new_inner.whole =
(_InterlockedExchangeAdd(&h->inner.whole,
addend.whole) +
addend.whole);
if (new_inner.removed == 1 && new_inner.counter == 0)
{
delete h->user_obj;
delete[] h->buf_ptr;
}
}

public:
store(TYPE* obj)
{
outer = create_holder(obj);
}

void replace_obj(TYPE* obj)
{
outer_union new_outer = create_holder(obj);
outer_union old_outer;
old_outer.whole =
_InterlockedExchange(&outer.whole,
new_outer.whole);
replace_holder(old_outer);
}

holder* acquire()
{
outer_union addend;
addend.counter = 1;
addend.ptr = 0;
outer_union old_whole;
old_whole.whole =
_InterlockedExchangeAdd(&outer.whole,
addend.whole);
holder* h = (holder*)(old_whole.ptr << STOLEN_BITS);
return h;
}

static void release(holder* h)
{
inner_union addend;
addend.counter = -1;
addend.removed = 0;
addend.padding = 0;
release_holder(h, addend);
}

static void add_ref(holder* h)
{
inner_union addend;
addend.counter = 1;
addend.removed = 0;
addend.padding = 0;
_InterlockedExchangeAdd(&h->inner.whole,
addend.whole);
}

~store()
{
outer_union old_outer;
old_outer.whole =
_InterlockedExchange(&outer.whole, 0);
replace_holder(old_outer);
}

};

// And here is the test:

long dead_counter = 0;

void reader_thread(void* p)
{
store& s = *(store*)p;
for (int i = 0; i < 2; ++i)
{
holder* h = s.acquire();
// here is our object
my_data* data = h->user_obj;
// work with data
printf("read %d\n", data->i);
Sleep(0);
// get one more ref
store::add_ref(h);
// ...
store::release(h);
store::release(h);
}
_InterlockedExchangeAdd(&dead_counter, 1);

}

void writer_thread(void* p)
{
store& s = *(store*)p;
for (int i = 1; i < 3; ++i)
{
printf("repl %d\n", i);
s.replace_obj(new my_data(i));
Sleep(0);
}
_InterlockedExchangeAdd(&dead_counter, 1);

}

int main()
{
{
store s (new my_data(0));

_beginthread(reader_thread, 0, &s);
_beginthread(writer_thread, 0, &s);
_beginthread(reader_thread, 0, &s);
_beginthread(writer_thread, 0, &s);

do {Sleep(1000);} while (dead_counter != 4);
}

printf("dead\n");

}

Dmitriy V'jukov

Dmitriy V'jukov

unread,
May 31, 2007, 3:15:55 PM5/31/07
to lock-free
And here is the code for deferred mode. I'll post only additions to
the code.

Here is local_mediator class definition:

// thread-local mediator for deferred release
struct local_mediator
{
holder* cache;
unsigned counter;

local_mediator()
: cache()
, counter()
{}

};

And here is new public methods of store class:

holder* acquire(local_mediator* m)
{
if (m->counter++)
return m->cache;
if (m->cache)


{
holder* h = (holder*)(outer.ptr <<
STOLEN_BITS);

if (m->cache == h)
return m->cache;
release(m->cache);
}
m->cache = acquire();
return m->cache;
}

static void release(local_mediator* m, holder* h)
{
m->counter--;
}

static void add_ref(local_mediator* m, holder* h)
{
m->counter++;
}

static void flush(local_mediator* m)
{
if (m->cache && !m->counter)
{
release(m->cache);
m->cache = 0;
}
}

And here is new test thread:

void deferred_reader_thread(void* p)
{
store& s = *(store*)p;
local_mediator m;


for (int i = 0; i < 2; ++i)
{

holder* h = s.acquire(&m);


// here is our object
my_data* data = h->user_obj;
// work with data
printf("read %d\n", data->i);
Sleep(0);
// get one more ref

store::add_ref(&m, h);
// ...
store::release(&m, h);
store::release(&m, h);
}
store::flush(&m);
_InterlockedExchangeAdd(&dead_counter, 1);

}

Dmitriy V'jukov

Reply all
Reply to author
Forward
0 new messages