Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

lock-free with collector objects

35 views
Skip to first unread message

Joseph Seigh

unread,
Jun 22, 2003, 6:09:46 PM6/22/03
to
I did some timings on a uniprocessor. The testcase is somewhat contrived
so as to get thread interleaving on a uniprocessor. Some sample results
were as follows


nthreads = 20
loop count = 10000
# items = 25
modulo = 303
b1 = false
1 ...
time = 1262
2 ...
time = 9274
3 ...
time = 7480
4 ...
time = 8462

nthreads = 20
loop count = 10000
# items = 25
modulo = 303
b1 = true
1 ...
time = 8362
2 ...
time = 18817
3 ...
time = 141574
4 ...
time = 174421

nthreads = 20
loop count = 10000
# items = 25
modulo = 10303
b1 = false
1 ...
time = 1212
2 ...
time = 2664
3 ...
time = 1282
4 ...
time = 2043


module is which n'th iteration of the loop count does a modification
of the linked list. The b1 flag controls whether some n'th read
interation does a delay. Times are in msecs (win32 ticks).

1 is the wait-free algorithm
2 is a quick and dirty reader/writer lock
3 is another quck and dirty reader/writer lock
4 is a mutex (actuall CriticalSection)

The third run has updates turned off (modulo > loop count). The second
rw lock allows readers to get lock without synchronization, just an
interlocked instruction, if lock is currently in read mode.

Joe Seigh

SenderX

unread,
Jun 22, 2003, 6:26:27 PM6/22/03
to
> I did some timings on a uniprocessor

Nice numbers on the wait-free algo!


What is your algo for back linking collectors? And, is there a way to get
the back-linking to cope with a big load:

nthreads = 64
loop count = 100000
# items = 75000
modulo = 2
b1 = false

P.S.: Are you going to reveal some source in the future?

;)


I'm curious to see how my lock-free reader/writer would do in this test...

--
The designer of the SMP and HyperThread friendly, AppCore library.

http://AppCore.home.attbi.com


Joseph Seigh

unread,
Jun 22, 2003, 6:37:49 PM6/22/03
to

SenderX wrote:
>
> > I did some timings on a uniprocessor
>
> Nice numbers on the wait-free algo!
>
> What is your algo for back linking collectors? And, is there a way to get
> the back-linking to cope with a big load:
>
> nthreads = 64
> loop count = 100000
> # items = 75000
> modulo = 2
> b1 = false

That would cause a stack overflow. At least until I put something in limit
the maximum number of outstanding collector objects. But it's not really
an issue since this is meant for relatively low modification levels. A
plain mutex or writer lock-free algorithm would work better in that case.


>
> P.S.: Are you going to reveal some source in the future?
>

Yes.

> I'm curious to see how my lock-free reader/writer would do in this test...
>

Joe Seigh

SenderX

unread,
Jun 22, 2003, 6:56:19 PM6/22/03
to
> That would cause a stack overflow.

I've been tinkering with your collector object idea with C++, and have my
own back linking algo. It does indeed overflow the stack under very heavy
load. Damn...

;)


> But it's not really

> an issue since this is meant for relatively low modification levels.;)

Yeah, great for lists / hash-buckets that need a bunch of lookups with few
writes...


> > P.S.: Are you going to reveal some source in the future?
> >
> Yes.

Very cool... Thanks Joe!


I am almost finished with some #ifdef'ing in my library that forces the main
thread-to-thread comm. API's ( Queue.c & Stack.c ) to use the C version of
atomic_ptr /w ABA CAS. I will post it soon.

Joseph Seigh

unread,
Jun 22, 2003, 8:26:15 PM6/22/03
to
-- lfq.h --
#define _WIN32_WINNT 0x0500
#include <windows.h>
#include <winbase.h>
#include <stdlib.h>
#include <cassert>
#include <atomic_ptr.h>


//-----------------------------------------------------------------------
//
//
//-----------------------------------------------------------------------
template<typename T> class collector {

public:
collector() : next(0), freeq(0) {}

~collector() {
if (qfree != 0)
delete freeq;
}

void qfree(T * z) {
assert(freeq == 0);
freeq = z;
}

typedef collector<T> _t;
typedef atomic_ptr<_t> ap_t;
typedef local_ptr<_t> lp_t;
ap_t next;
private:
T *freeq; // free queue

};

template<typename T> class lfq;
template<typename T> class lfq_iterator;
template<typename T> class lfq_iterator_raw;

//-----------------------------------------------------------------------
//
//
//-----------------------------------------------------------------------
class lfq_mutex {
public:
lfq_mutex() { InitializeCriticalSection(&mutex); }
~lfq_mutex() { DeleteCriticalSection(&mutex); }
void lock() { EnterCriticalSection(&mutex); }
void unlock() { LeaveCriticalSection(&mutex); }
private:
CRITICAL_SECTION mutex;
};


//-----------------------------------------------------------------------
//
//
//-----------------------------------------------------------------------
template<typename T> class lfq_elem {
friend class lfq<T>;
friend class lfq_iterator<T>;
friend class lfq_iterator_raw<T>;

private:
lfq_elem() : next(0), prev(0), item(), isFree(false) {}
lfq_elem(T z) : next(0), prev(0), item(z), isFree(false) {}
//~lfq_elem();

// enqueue z after this
void enqueue(lfq_elem<T> * z) {
z->next = next;
z->prev = this;

next->prev = z;
next = z;
}

// dequeue this;
bool dequeue() {
if (isFree == false) {
prev->next = next;
next->prev = prev;
isFree = true;
return true;
}
else
return false;
}

lfq_elem *next;
lfq_elem *prev;
bool isFree;
T item;
public:
typedef lfq_elem<T> _t;

};


//-----------------------------------------------------------------------
//
//
//-----------------------------------------------------------------------
template<typename T> class lfq {
friend class lfq_iterator<T>;
friend class lfq_iterator_raw<T>;

public:
lfq() : q(new collector<lfq_elem<T>::_t>()), qmutex() {
qhead = new lfq_elem<T>();
qtail = new lfq_elem<T>();

qhead->next = qtail;
qtail->prev = qhead;
}

~lfq() {
lfq_elem<T> *n1, *n2;
n1 = qhead;
while (n1 != 0) {
n2 = n1->next;
delete n1;
n1 = n2;
}
}

//
void push(T item) {
lfq_elem<T> *elem;

qmutex.lock();
elem = new lfq_elem<T>(item); // throw?
qhead->enqueue(elem);
qmutex.unlock();
}

//
bool dequeue(lfq_iterator<T> & z) {
//local_ptr<collector<lfq_elem<T>::_t>::_t> lq;
collector<lfq_elem<T>::_t>::lp_t lq;
bool rc;

qmutex.lock();
lq = q; // get current

if ((rc = z.exists()) == false)
;

else if (rc = z.current->dequeue()) {
// push onto free queue
lq->qfree(z.current);

// next GC iteration
lq->next = new collector<lfq_elem<T>::_t>();
q = lq->next;

}

qmutex.unlock();

return rc;
}

//
bool dequeue(lfq_iterator_raw<T> & z) {
bool rc;

assert(z.exists());
rc = z.current->dequeue();
assert(rc);
delete(z.current);

return true;
}

void lock() { qmutex.lock(); }
void unlock() { qmutex.unlock(); }

private:
//atomic_ptr<collector<lfq_elem<T>::_t>::_t> q;
collector<lfq_elem<T>::_t>::ap_t q;
lfq_mutex qmutex; // mutex
lfq_elem<T> *qhead; // queue head
lfq_elem<T> *qtail; // queue tail
};


//-----------------------------------------------------------------------
//
//
//-----------------------------------------------------------------------
template<typename T> class lfq_iterator {
friend class lfq<T>;

public:
lfq_iterator() : current(0), q(0) {}

lfq_iterator(lfq<T> &src) : q(src.q) {
current = src.qhead->next;
}

//~lfq_iterator();

lfq_iterator<T> & operator = (lfq<T> &src) {
q = src.q;
current = src.qhead->next;
return *this;
}

lfq_iterator<T> & operator = (lfq_iterator<T> & src) {
q = src.q;
current = src.current;
return *this;
}

bool exists() { return (current->next != 0); }

void next() {
if (current->next != 0)
current = current->next;
}

T & get() { return current->item; }
T & operator * () { return current->item; }

private:

lfq_elem<T> * current;
//local_ptr<collector<lfq_elem<T>::_t>::_t> q;
collector<lfq_elem<T>::_t>::lp_t q;

};

//-----------------------------------------------------------------------
// "raw" iterator
//
//-----------------------------------------------------------------------
template<typename T> class lfq_iterator_raw {
friend class lfq<T>;

public:
lfq_iterator_raw() : current(0), _next(0) {}

lfq_iterator_raw(lfq<T> &src) {
current = src.qhead->next;
_next = current->next;
}

//~lfq_iterator();

lfq_iterator_raw<T> & operator = (lfq<T> &src) {
current = src.qhead->next;
_next = current->next;
return *this;
}

lfq_iterator_raw<T> & operator = (lfq_iterator_raw<T> &src) {
current = src.current;
_next = src._next;
return *this;
}

bool exists() { return _next != 0; }

void next() {
if (_next != 0) {
current = _next;
_next = _next->next;
}
}

T & get() { return current->item; }
T & operator * () { return current->item; }

private:

lfq_elem<T> *current;
lfq_elem<T> *_next;

};

/*-*/

Joseph Seigh

unread,
Jun 22, 2003, 8:28:15 PM6/22/03
to
-- lfqtest.cpp --

#define _WIN32_WINNT 0x0500
#include <windows.h>
#include <winbase.h>
#include <stdio.h>
#include <stdlib.h>
#include <cassert>

#include "lfq.h"

LONG t1count = 0;
LONG count = 0;
LONG match1 = 0;
LONG match2 = 0;

int n = 10000; // loop count
int nitems = 25;
int nthreads = 20;
int m = 10303; // modulo
//int p = 13; // p' hash
bool b1 = false;


//
//
//
class rwlock2 {
public:
rwlock2() {
handle[0] = CreateEvent(NULL, FALSE, TRUE, NULL); // auto reset, initally set
handle[1] = CreateSemaphore(NULL, 0, 999, NULL);
count = 0;
rcount = 0;
wcount = 0;
}

~rwlock2() {
CloseHandle(handle[0]);
CloseHandle(handle[1]);
}

DOUBLE rdlock() {
BOOL brc;
DOUBLE rc;
LONG temp, temp2;

if (wcount == 0) {
temp = count;
while (temp > 0) {
temp2 = temp;
temp = InterlockedCompareExchange(&count, temp2 + 1, temp2);
if (temp == temp2)
return true;
}
}

InterlockedIncrement(&rcount);
rc = WaitForMultipleObjects(2, handle, FALSE, INFINITE);

if ((rc - WAIT_OBJECT_0) == 0) {
InterlockedDecrement(&rcount);
count = 1;
}

if (wcount == 0 && rcount > 0) {
if ((temp = InterlockedExchange(&rcount, 0)) > 0) {
InterlockedExchangeAdd(&count, temp);
brc = ReleaseSemaphore(handle[1], temp, NULL);
assert(brc);
}

}

return rc;
}

DOUBLE wrlock() {
DOUBLE rc;
InterlockedIncrement(&wcount);
rc = WaitForSingleObject(handle[0], INFINITE);
InterlockedDecrement(&wcount);
return rc;
}

BOOL unlock() {
BOOL rc = TRUE;

if (count == 0 || InterlockedDecrement(&count) == 0) {
rc = SetEvent(handle[0]);
assert(rc);
}
return rc;
}

private:
HANDLE handle[2]; // 0 : event, 1 : semaphore
LONG count;
LONG rcount; // pending reads
LONG wcount; // pending writes
};

//
//
//
class rwlock {
public:
rwlock() {
handle[0] = CreateMutex(NULL, FALSE, NULL);
handle[1] = CreateEvent(NULL, TRUE, TRUE, NULL); // manual reset initally set
count = -1;
}

~rwlock() {
CloseHandle(handle[0]);
CloseHandle(handle[1]);
}

DOUBLE rdlock() {
BOOL brc;
DOUBLE rc;
rc = WaitForSingleObject(handle[0], INFINITE);
if (count == -1) {
brc = ResetEvent(handle[1]);
assert(brc);
count = 1;
}
else
InterlockedIncrement(&count);
brc = ReleaseMutex(handle[0]);
assert(brc);

return rc;
}

DOUBLE wrlock() {
return WaitForMultipleObjects(2, handle, TRUE, INFINITE);
}

BOOL unlock() {
BOOL rc = TRUE;

if (count > 0) {
if (InterlockedDecrement(&count) == 0) {
WaitForSingleObject(handle[0], INFINITE);
if (count == 0) {
rc = SetEvent(handle[1]);
assert(rc);
count = -1;
}
rc = ReleaseMutex(handle[0]);
assert(rc);
}
}

else {
rc = ReleaseMutex(handle[0]);
assert(rc);
}

return rc;
}

private:
HANDLE handle[2]; // 0 : mutex, 1 : event
LONG count;
};

//
// test class
//
class t1 {
public:
t1(int z = 0) {
value = z;
InterlockedIncrement(&t1count);
InterlockedIncrement(&count);
}

t1(t1 & z) {
value = z.value;
InterlockedIncrement(&t1count);
InterlockedIncrement(&count);
}

~t1() {
InterlockedDecrement(&count);
}

int get() { return value; }

private:
int value;
};


lfq<t1> *q; // lock free queue
rwlock rw; // reader/writer lock
rwlock2 rw2;

//
//
//
DWORD WINAPI ftest4(void * xxx) {
int j, k;
lfq_iterator_raw<t1> ndx1;

for (j = 0; j < n; j++) {
q->lock();
k = 0;
for (ndx1 = *q; ndx1.exists(); ndx1.next()) {
if (j%m == 0 && k++ == 5) {
InterlockedIncrement(&match1);
if (q->dequeue(ndx1)) {
InterlockedIncrement(&match2);
q->push(t1(-1));
}
Sleep(10);
}
else if (b1 && j%m == 3)
Sleep(10);
}
q->unlock();
}

return 0;
}

//
//
//
DWORD WINAPI ftest3(void * xxx) {
int j, k;
lfq_iterator_raw<t1> ndx1;

for (j = 0; j < n; j++) {
(j%m == 0) ? rw2.wrlock() : rw2.rdlock();
k = 0;
for (ndx1 = *q; ndx1.exists(); ndx1.next()) {
if (j%m == 0 && k++ == 5) {
InterlockedIncrement(&match1);
if (q->dequeue(ndx1)) {
InterlockedIncrement(&match2);
q->push(t1(-1));
}
Sleep(10);
}
else if (b1 && j%m == 3)
Sleep(10);
}
rw2.unlock();
}

return 0;
}


//
//
//
DWORD WINAPI ftest2(void * xxx) {
int j, k;
lfq_iterator_raw<t1> ndx1;

for (j = 0; j < n; j++) {
(j%m == 0) ? rw.wrlock() : rw.rdlock();
k = 0;
for (ndx1 = *q; ndx1.exists(); ndx1.next()) {
if (j%m == 0 && k++ == 5) {
InterlockedIncrement(&match1);
if (q->dequeue(ndx1)) {
InterlockedIncrement(&match2);
q->push(t1(-1));
}
Sleep(10);
}
else if (b1 && j%m == 3)
Sleep(10);
}
rw.unlock();
}

return 0;
}

//
//
//
DWORD WINAPI ftest1(void * xxx) {
int j, k;
lfq_iterator<t1> ndx1;

for (j = 0; j < n; j++) {
k = 0;
for (ndx1 = *q; ndx1.exists(); ndx1.next()) {
if (j%m == 0 && k++ == 5) {
InterlockedIncrement(&match1);
if (q->dequeue(ndx1)) {
InterlockedIncrement(&match2);
q->push(t1(-1));
}
Sleep(10);
}
else if (b1 && j%m == 3)
Sleep(10);
}
}

return 0;
}


void main1(int z) {
HANDLE hThread[40];
int j;
lfq<t1> q1; // lock free queue

// initialize queue
//
for (j = 0; j < nitems; j++)
q1.push(t1(j%m));
q = &q1;

// start threads
//
for (j = 0; j < nthreads; j++) {
hThread[j] = CreateThread(NULL, 0,
(z == 1) ? ftest1 :
(z == 2) ? ftest2 :
(z == 3) ? ftest3 :
(z == 4) ? ftest4 :
ftest4,
NULL, 0, NULL);

}

WaitForMultipleObjects(j, hThread, TRUE, INFINITE);

return;
}


int main(int argc, char *argv[]) {
DWORD t1, t2;
int j;


printf("nthreads = %d\n", nthreads);
printf("loop count = %d\n", n);
printf("# items = %d\n", nitems);
printf("modulo = %d\n", m);
printf("b1 = %s\n", b1 ? "true" : "false");

for (j = 1; j <= 4; j++) {
printf("%d ...\n", j);

t1 = GetTickCount();
main1(j);
t2 = GetTickCount();

//--

printf("time = %9d\n", (t2 - t1));
printf("match1 = %d\n", match1);
printf("match2 = %d\n", match2);
printf("t1count = %ld\n", t1count);
printf("count = %ld\n", count);

//--
match1 = 0;
match2 = 0;
t1count = 0;
count = 0;
}


return 0;
}

/*-*/

Joseph Seigh

unread,
Jun 22, 2003, 8:40:35 PM6/22/03
to

That was the source code. Keep in mind it's a proof of concept and probably won't
resemble the final code too much, assuming I do final code. The collector class
is likely to change. Current version doesn't really implement pushing more than
one element to be freed since the testcase doesn't use that capability.

Also the element links should be one of the atomic<T> types, not plain C pointers.

I make no warranty about the reader/writer lock implementations in the testcase or
that they even work correctly.

I might do iterators differently but they definitely will not look like STL/Boost
style iterators. Iterators have nothing to do with the collector class anyway.

Read lock-free semantics are different than what most people are used to. Copying
is recommended for reading data from a read lock-free collection. If you use
pointers (they would have to be smart pointers) be aware that the object can exists
both in and out of the collection at the same time (in a sense). If your object
state depends on its membership in a collection then read lock-free is not going to
work.

Joe Seigh

SenderX

unread,
Jun 22, 2003, 8:42:07 PM6/22/03
to
Sweet! I am going to study this deeply, and convert it to C.

SenderX

unread,
Jun 22, 2003, 10:52:19 PM6/22/03
to
> Why do you think someone will need this ugly C-code ?

Maybe somebody who doesn't like C++ that much?

Joseph Seigh

unread,
Jun 23, 2003, 6:36:09 AM6/23/03
to

I wrote:


>
> SenderX wrote:
> > What is your algo for back linking collectors? And, is there a way to get
> > the back-linking to cope with a big load:
> >
> > nthreads = 64
> > loop count = 100000
> > # items = 75000
> > modulo = 2
> > b1 = false
>
> That would cause a stack overflow. At least until I put something in limit
> the maximum number of outstanding collector objects. But it's not really
> an issue since this is meant for relatively low modification levels. A
> plain mutex or writer lock-free algorithm would work better in that case.

A little further detail. The limitation on number of outstanding collector objects
isn't to improve the update/modification throughput of this technique which isn't
meant for high modification levels. It's to improve the technique's robustness.
Particularly if the updates were bursty.

An obvious way would be to use a counting semaphore. That would put heavyweight
synchronization in the collector dtor (for win32 anyway). This should not be
much of a problem for infrequently updated collections however because you wouldn't
be executing many dtors.

If you are limiting the number of collectors and reusing them via an object pool,
subpooling them per collection gives you something that can be considered a
producer/consumer problem with a circular list. The collector dtor would be the
producer of free collector objects and collection modifications would be the
consumer. There may be some interesting techniques you could do here.

Joe Seigh

0 new messages