About lock-free unbounded fifo/lifo queues

322 views
Skip to first unread message

Raine

unread,
May 14, 2009, 5:41:15 PM5/14/09
to Scalable Synchronization Algorithms, dvy...@gmail.com
Hi all!

Since I'm really new to this *-free stuff { OK...I'm a noob :_( } and
since I've been following many posts
related on lock-free data structures on comp.programming.threads from
Mr. Dimitry V'jukov,
Chris Thomasson and other people too, I've really been interested in
trying to use lock-free algorithms
on my university project (basically is a asynchronous message queue
service bus). I've been making
some rude tests (in C) on a multi-producer/single-consumer (mpsc) fifo
queue obtained from
this post:
http://groups.google.com/group/comp.programming.threads/browse_thread/thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526
and which it's posted at the end of this post (please excuse/forgive
any bad coding - it's just a rude test...).

Since then, I've have accumulating a lot of doubts and, unfortunately,
couldn't find helpful advice.
So why not asking where those ideas came from ;)
I'll really appreciate any help on those questions I'll post above
here:

1) Based on the test program above, the only way (beside implementing
my self those calls)
I've found on making XCHG primitive workable was to use GNG gcc built-
in atomic ops, and since in projects like Innodb
(MySQL db engine) they've made theire own atomic primitives, I've
wondering if this is
'a just fine' way for performance. I know that that should not be a
concern right now
(that Knuth's phrase: ... 'premature optimization is the root of all
evil'), but since I'm curious,
why not just ask to people who dealed about this. Do you know any
other good implementation
of XCHG, instead gcc compiler built-in or even instead Linux own
implementation ('asm/atomic.h')?

2) Looking at my code do you think if I want to make a message queue
do I have to worry about memory alignment issues over
performance costs or the way I'm implementing could be just fine?
Any tip or advice on this? I personally really messes with alignment
stuff :-D

3) And the for optimizing cache affinity in this case? Any advice
besides forcing threads X and Y to work
fixed on different cores?

4) Is it possible to implement a stack/LIFO using that algorithm from
Mr. Dimitry V'jukov
(http://groups.google.com/group/comp.programming.threads/browse_thread/
thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526) ?
And a circular buffer?

5) Since that algorithm is really awesome (at least comparing with a
traditional
multi-producer/multi-consumer lock-based queue see http://www.aagh.net/projects/thrqueue)
it would be a bad idea using locks/condvars/semaphores to make a
bounded fifo.
You can see in the program above that the queue is unbounded, and
memory get really busted
due to producers enqueueing more that consumer can dequeue, and since
no producer sleeps if any
queue limit is reached, this could be a bad idea on avoiding hogging
system resources...
So, here is my idea:

"Use some sort of notification mechanism so producers can 'poll' that
event-base and continuing
enqueueing more data. The responsible to acknowledge a ready-to-
produce queue would be the consumer."

I don't know that this should be a bloated idea or just using simple
semaphores or condvars could be
just fine... What do you think about this? Any other idea?


Thanks again for the help, I really stuck in those topics, and don't
know how to proceed in my project...

Here is the working program:

/
********************************************************************************************************************
*
* Basically it creates N-mpsc queues fed by 'PRODUCERS' producers
threads in a round-robin fashion
* and consumed (each separate queue by a independent consumer).
* 'CONSUMERS' param will define how many queues should be created and
thus how many consumer
* threads will be consuming each independent queue.
* Throughput is considered as how much a producer thread can enqueue
per second and how much a
* consumer can dequeue per second.
* Also producers calculates random number and accumulate it to
compare with same accumulated
* value on consumer side (it's a rude way to check consistency). Also
insertion ordering is check
* on consumer side.
*
*/
#define _GNU_SOURCE
#define _XOPEN_SOURCE 600

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/times.h>
#include <sys/vtimes.h>
#include <math.h>
#include <pthread.h>


/*
* From GNU compiler 4.1.2+ has builtin atomic ops - See:
* http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
*/
#if ( (__GNUC__ == 4) && (__GNUC_MINOR__ >= 1) || __GNUC__ > 4) && \
(defined(__x86_64__) || defined(__i386__))
#define XCHG __sync_lock_test_and_set
#endif

#define CONSUMERS 1
#define PRODUCERS 4
#define PRODUCERS_ITERATIONS 1000000 /*0 = forever*/

#define SLEEP_BTW_ENQUEUE 0 /*0 = don't
sleep*/

#define DEBUG_PRODUCER 0
#define DEBUG_CONSUMER 0

//#define CALCLATENCY

struct queuenode_data
{
char thr_id[100];
int ithid;
double dvalue;
unsigned long qdid;
#ifdef CALCLATENCY
struct timeval tv;
#endif
};

struct thread_args
{
char thr_id[100];
int ithid;
double dvalue;
unsigned long counts;
double throughput;
};

/* ------------------------------------------------ */
/*
* Extracted from
* http://groups.google.com/group/comp.programming.threads/browse_thread/thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526
* Author: Dmitriy V'jukov
*/
struct mpscq_node_t
{
struct queuenode_data *qndata;
struct mpscq_node_t* volatile next;
};

struct mpscq_t
{
struct mpscq_node_t* volatile head;
struct mpscq_node_t* tail;
struct mpscq_node_t stub;

};

#define MPSCQ_STATIC_INIT(self) {&self.stub, &self.stub, {0}}

void mpscq_create(struct mpscq_t* self)
{
self->head = &self->stub;
self->tail = &self->stub;
self->stub.next = 0;

}

void mpscq_push(struct mpscq_t* self, struct mpscq_node_t* n)
{
n->next = 0;
struct mpscq_node_t* prev = XCHG(&self->head, n);
//(*)
prev->next = n;

}

struct mpscq_node_t* mpscq_pop(struct mpscq_t* self)
{
struct mpscq_node_t* tail = self->tail;
struct mpscq_node_t* next = tail->next;
if (tail == &self->stub)
{
if (0 == next)
return 0;
self->tail = next;
tail = next;
next = next->next;
}
if (next)
{
self->tail = next;
return tail;
}
struct mpscq_node_t* head = self->head;
if (tail != head)
return 0;
mpscq_push(self, &self->stub);
next = tail->next;
if (next)
{
self->tail = next;
return tail;
}
return 0;

}
/* ------------------------------------------------ */

uint allproducersgone = 0;

/* Queue */
struct mpscq_t **queues;

static struct thread_args **thargs_prodpool;
static struct thread_args **thargs_conspool;

struct mpscq_node_t* mpscq_create_node(struct queuenode_data *qndata)
{
struct mpscq_node_t *qn = (struct mpscq_node_t*) malloc(sizeof
(struct mpscq_node_t));
qn->qndata = qndata;

return qn;
}

static double now()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + (double) tv.tv_usec / 1000000;
}

static double now_d(struct timeval *tv)
{
return tv->tv_sec + (double) tv->tv_usec / 1000000;
}

static struct timeval* now_tv()
{
struct timeval *tv = (struct timeval*) malloc(sizeof(struct
timeval));
gettimeofday(tv, NULL);
return tv;
}

static unsigned long diff_usecs(struct timeval *tvs,struct timeval
*tvf){
return (tvf->tv_sec - tvs->tv_sec) * 1000000 + (tvf->tv_usec - tvs-
>tv_usec);
}

static unsigned long diff_secs(struct timeval *tvs,struct timeval *tvf)
{
return (tvf->tv_sec - tvs->tv_sec);
}

void producer_runtime(void *_tharg)
{
double dvalue=0.0;
double throughput;
unsigned long counts=0,elapsed;
long int rdv;
struct thread_args *thargs = (struct thread_args *)_tharg;
struct queuenode_data *qndata;
struct mpscq_node_t *node;
struct timeval start_tv,end_tv;
struct mpscq_t *queue;

printf("%s: thread alive [producer_runtime]\n",thargs->thr_id);

/* Start time counter */
gettimeofday(&start_tv, NULL);

while(counts < PRODUCERS_ITERATIONS || PRODUCERS_ITERATIONS == 0)
{
/* Round-robin queue */
queue = queues[counts % CONSUMERS];

/* Create and initialize cell */
qndata = (struct queuenode_data *) malloc(sizeof(struct
queuenode_data));
qndata->qdid = counts+1;
rdv = random()%1000;
dvalue += rdv;
qndata->dvalue = rdv;

#ifdef CALCLATENCY
gettimeofday(&(qndata->tv), NULL);
#endif

strcpy(qndata->thr_id,thargs->thr_id);
qndata->ithid = thargs->ithid;

/* Create and set queue node */
node = mpscq_create_node(qndata);

/* Enqueue new node */
mpscq_push(queue,node);

/* Log enqueuing */
if(DEBUG_PRODUCER > 0) printf("%s: enqueue at %lu\n",thargs-
>thr_id, counts);
counts++;

if(SLEEP_BTW_ENQUEUE != 0) usleep(SLEEP_BTW_ENQUEUE);
}
/* End time counter */
gettimeofday(&end_tv, NULL);

thargs->dvalue = dvalue;
elapsed = diff_secs(&start_tv,&end_tv);

if(elapsed > 0) throughput = counts / elapsed ;
thargs->counts = counts;
thargs->throughput = throughput;

printf("%s: thread exit. Total %lu in %lu secs. Throughput: %.2f
dvalue: %.2f\n",thargs->thr_id,counts,elapsed,throughput,dvalue);
}

void consumer_runtime(void *_tharg)
{
struct thread_args *thargs = (struct thread_args *)_tharg;
struct mpscq_node_t *node;
struct timeval tvnow;
struct timeval start_tv,end_tv;
unsigned long counts=0,elapsed;
double throughput;
double minlatency=(double)
INFINITY,maxlatency=0.0,avglatency=0.0,accumlatency=0.0;
double dvalues[PRODUCERS];
unsigned long qdids[PRODUCERS];
unsigned long outoforder=0;
struct mpscq_t *queue = queues[thargs->ithid];

printf("%s: thread alive [consumer_runtime]\n",thargs->thr_id);

/* Start time counter */
gettimeofday(&start_tv, NULL);

while(1)
{
/* Dequeue (consume) node */
node = mpscq_pop(queue);

if(node != NULL)
{
#ifdef CALCLATENCY
gettimeofday(&tvnow, NULL);
elapsed = diff_usecs(&(node->qndata->tv),&tvnow);

if(elapsed > maxlatency) maxlatency = elapsed;
else if(elapsed < minlatency) minlatency = elapsed;
accumlatency += (double) elapsed;

if(DEBUG_CONSUMER > 0) printf("%s: dequeued %s at %.2f -> %lu
usecs\n",thargs->thr_id, node->qndata->thr_id, now_d(&tvnow),
elapsed);
#endif

counts++;
dvalues[node->qndata->ithid] += node->qndata->dvalue;

if(node->qndata->qdid <= qdids[node->qndata->ithid])
{
outoforder++;
printf("%s: Out of order: %lu from producer %s\n",thargs-
>thr_id,node->qndata->qdid,node->qndata->thr_id);
}
else
{
qdids[node->qndata->ithid] = node->qndata->qdid;
}

/* Free resources */
free(node->qndata); node->qndata = NULL;
free(node); node = NULL;
}
else
{
if(allproducersgone == 0)
{
if(DEBUG_CONSUMER > 0) printf("%s: nothing to dequeue
\n",thargs->thr_id);
//usleep(1000);
}else break;
}
}

/* End time counter */
gettimeofday(&end_tv, NULL);
elapsed = diff_secs(&start_tv,&end_tv);
if(elapsed > 0) throughput = counts / elapsed ;
thargs->counts = counts;
avglatency = accumlatency / counts;
thargs->throughput = throughput;

printf("%s: thread exit. Consumed: %lu in %lu secs Throughput: %.2f
[min: %.2f , max: %.2f , avg: %.2f] usecs\n",thargs-
>thr_id,counts,elapsed,throughput,minlatency,maxlatency,avglatency);

int i=0;
for(i=0;i < PRODUCERS; i++)
{
printf("%s->P%d dvalue: %.2f\n",thargs->thr_id,i,dvalues[i]);
}
printf("%s: Total out of order: %lu\n",thargs->thr_id,outoforder);
}

int main()
{
int i=0;
unsigned long total_prod=0,total_cons=0;
double total_throughput_prod=0.0,total_throughput_cons=0.0;
struct thread_args *thargs;
pthread_t th_producers[PRODUCERS], th_consumers[CONSUMERS];

thargs_prodpool = (struct thread_args **) malloc(sizeof(struct
thread_args) * PRODUCERS);
thargs_conspool = (struct thread_args **) malloc(sizeof(struct
thread_args) * CONSUMERS);

/* Create and initialize global queues array */
queues = (struct mpscq_t **) malloc(sizeof(struct mpscq_t) *
CONSUMERS);
for(i=0; i < CONSUMERS ; i++)
{
queues[i] = (struct mpscq_t *) malloc(sizeof(struct mpscq_t));
mpscq_create(queues[i]);
}

/* Spawn producer threads */
for(i = 0; i < PRODUCERS; i++)
{
/*Initialize thread arg*/
thargs = (struct thread_args *) malloc(sizeof(struct
thread_args));
thargs_prodpool[i] = thargs;
sprintf(thargs->thr_id,"P%d",i);
thargs->ithid = i;

pthread_create(&th_producers[i], NULL, producer_runtime, thargs);
}
/* Spawn consumers thread */
for(i = 0; i < CONSUMERS; i++)
{
thargs = (struct thread_args *) malloc(sizeof(struct
thread_args));
thargs_conspool[i] = thargs;
sprintf(thargs->thr_id,"C%d",i);
thargs->ithid = i;

pthread_create(&th_consumers[i], NULL, consumer_runtime, thargs);
}

/* Join all threads */
for(i = 0; i < PRODUCERS; i++)
{
pthread_join(th_producers[i], NULL);
}
allproducersgone = 1;
for(i = 0; i < CONSUMERS; i++)
{
pthread_join(th_consumers[i], NULL);
}

/* Get stats */
for(i = 0; i < PRODUCERS; i++)
{
total_prod += thargs_prodpool[i]->counts;
total_throughput_prod += thargs_prodpool[i]->throughput;
}
for(i = 0; i < CONSUMERS; i++)
{
total_cons += thargs_conspool[i]->counts;
total_throughput_cons += thargs_conspool[i]->throughput;
}

printf("\nTotal items produced: %lu , Total items consumed: %lu
\n",total_prod,total_cons);
printf("Total throughput produced: %.2f , Total throughput consumed:
%.2f\n\n",total_throughput_prod,total_throughput_cons);

printf("Bye!\n");
fsync(stdout);

return 0;

}
/
*****************************************************************************************************************************/

Thanks again for all your help!

Best regards,
Raine

Raine

unread,
May 15, 2009, 11:16:54 AM5/15/09
to Scalable Synchronization Algorithms
> 2) Looking at my code do you think if I want to make a message queue
> do I have to worry about memory alignment issues over
> performance costs or the way I'm implementing could be just fine?
> Any tip or advice on this? I personally really messes with alignment
> stuff :-D
>
> 3) And the for optimizing cache affinity in this case? Any advice
> besides forcing threads X and Y to work
> fixed on different cores?

BTW, sorry, I've done some big mistake here. I was talking about cache
line padding possible issues and performance issues related to cache
coherency.

Sorry for that :(

Just complementing question (5), is it possible to make these
unbounded queue as a bounded queue? How could consumer and producer
threads react (I mean they shoud block) when queue is empty or full?

Thanks again for all support and help!

Best regards,
Raine

Chris M. Thomasson

unread,
May 29, 2009, 4:29:39 PM5/29/09
to Scalable Synchronization Algorithms

On May 15, 8:16 am, Raine <raine...@ymail.com> wrote:
> [...]
> Just complementing question (5), is it possible to make these
> unbounded queue as a bounded queue? How could consumer and producer
> threads react (I mean they shoud block) when queue is empty or full?

Here is working algorithm for a bounded queue with waitable boundary
conditions (empty/full):

http://relacy.pastebin.com/f4ec057ef

It uses my eventcount algorithm for the conditional waiting.

Dmitriy Vyukov

unread,
May 30, 2009, 9:36:38 AM5/30/09
to Scalable Synchronization Algorithms
HI Raine,


--------------------------------
1) Based on the test program above, the only way (beside implementing
my self those calls)
I've found on making XCHG primitive workable was to use GNG gcc built-
in atomic ops, and since in projects like Innodb
(MySQL db engine) they've made theire own atomic primitives, I've
wondering if this is
'a just fine' way for performance. I know that that should not be a
concern right now
(that Knuth's phrase: ... 'premature optimization is the root of all
evil'), but since I'm curious,
why not just ask to people who dealed about this. Do you know any
other good implementation
of XCHG, instead gcc compiler built-in or even instead Linux own
implementation ('asm/atomic.h')?
--------------------------------


Here is several aspects.
In general, I like compiler intrinsics (both gcc and msvc), they are
handly and usually work just fine. However they implement only
*sequentially consistent* atomics (both gcc and msvc). If you need (or
willing to work with) only them, them I guess compiler intrinsics are
the best choice for now. But there are also so called atomics with
fine-grained memory ordering, which are indeed faster on *some*
architectures. They are faster on Sparc, IA-64, etc. However they are
no faster on x86 (IA-32, Intel 64, AMD 64), because the platform
itself provides only sequentially consistent atomics (so fine-grained
atomics are just mapped to sequentially consistent atomics). So one
more choice for you: if your main platform is x86, then compiler
intrinsics are the best choice and will not cause any performance
penalty. However if you are working on Sparc, then you can implement
faster atomics with asm.
Why Innodb uses asm... well, I don't know. Probably they just started
before gcc built-in atomics. There are many old projects out there
which uses asm for atomics, most notably ACE.
Other good implementations of atomics ops are ACE, ATOMIC_OPS, both
provide only sequentially consistent atomics so will not buy you
anything. Compiler intrinsics will be actually faster because of
absence of function calls.
There is a total chaos with atomics currently, hopefully we will get
consistent, portable, fine-grained atomics for C/C++ from C++0x in
several years.
p.s. usually you may distinguish fine-grained atomics by postfixes
like acquire/acq, release/rel, relaxed, naked, acq_rel, or something
like this.


--------------------------------
2) Looking at my code do you think if I want to make a message queue
do I have to worry about memory alignment issues over
performance costs or the way I'm implementing could be just fine?
Any tip or advice on this? I personally really messes with alignment
stuff :-D
--------------------------------

Alignment is important to some degree (but not so important as
padding).
Consider following example. You transfer a message between threads.
Message size is 64 bytes (cache-line size on modern x86 processors).
If a message is aligned on a 64-byte (cache-line) boundary, then only
single cache-line has to be transferred between processors. If a
message is not aligned on a cache-line boundary then 2 cache-lines
have to be transferred. Transfer of cache-lines is a main performance
hit, so this aspect can basically double/halve the throughput.
I think it's possible to construct sythetic benchmark which will show
huge difference between aligned and unaligned objects. What effect it
will have in real application... difficult to say, it depends. Though
it's always nice to have aligned objects.


--------------------------------
3) And the for optimizing cache affinity in this case? Any advice
besides forcing threads X and Y to work
fixed on different cores?
--------------------------------

Hmmm... I guess it will be better for then to work on the *same* core
others equal.


--------------------------------
BTW, sorry, I've done some big mistake here. I was talking about cache
line padding possible issues and performance issues related to cache
coherency.
--------------------------------

Cache-line padding can make HUUUUUUUUUUGE difference. It's the most
overlooked moment. Many people think that only number of atomic
operations matter, and padding can make little difference if any. This
is total bullshit. Atomic operations are cheap, it's sharing that
matters, and padding eliminates unnecessary sharing.
I've made a number of tests regarding padding on synthetic benchmarks,
and the results are usually something like this. When properly padded
data structure shows speedup of 2/3/4 (depending on a nature of a
datastructure) on a quad-core processor. And when I comment out single
line "char pad [64];", the speedup drops to 0.15 or so (i.e slowdown).
All synchronization objects MUST BE properly padded, it's one of the
most important moments.
Sometimes I even think that low-level threading library have to expose
queues/stacks via following interface:

class spsc_queue_producer_part
{
...
};

class spsc_queue_consumer_part
{
...
};

Then user is free to place these as:
struct device_driver
{
spsc_queue_producer_part queuep;
char pad [cacheline_size];
spsc_queue_consumer_part queuec;
};

or:

struct worker_thread
{
spsc_queue_producer_part queuep [thread_count];
spsc_queue_consumer_part queuec [thread_count];
};

and then connect both parts at run-time:
queuep.connect_to(queuec);

Note that worker_thread::queuec objects will be connected to producer
parts located in different worker_thread objects (to setup fully-
connected graph, for example).

In NUMA environment it's the only option, because object must be
padded not to the cacheline size, but to the memory page size (so that
one page will be allocated on one NUMA node, and another page - on
another node).


--------------------------------------------------
4) Is it possible to implement a stack/LIFO using that algorithm from
Mr. Dimitry V'jukov
(http://groups.google.com/group/comp.programming.threads/
browse_thread/
thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526) ?
And a circular buffer?
-------------------------------------------------------

I don't get the question. It's possible to implement both...



-------------------------------------------------------
5) Since that algorithm is really awesome (at least comparing with a
traditional
multi-producer/multi-consumer lock-based queue see http://www.aagh.net/projects/thrqueue)
it would be a bad idea using locks/condvars/semaphores to make a
bounded fifo.
You can see in the program above that the queue is unbounded, and
memory get really busted
due to producers enqueueing more that consumer can dequeue, and since
no producer sleeps if any
queue limit is reached, this could be a bad idea on avoiding hogging
system resources...
So, here is my idea:

"Use some sort of notification mechanism so producers can 'poll' that
event-base and continuing
enqueueing more data. The responsible to acknowledge a ready-to-
produce queue would be the consumer."

I don't know that this should be a bloated idea or just using simple
semaphores or condvars could be
just fine... What do you think about this? Any other idea?

Just complementing question (5), is it possible to make these
unbounded queue as a bounded queue? How could consumer and producer
threads react (I mean they shoud block) when queue is empty or full?
-------------------------------------------------------

Usage of semaphores or condvars will totally destroy the whole idea of
the queue. Condvars require a mutex, so the whole queue state has to
be protected by a mutex. Semaphores do not require a mutex, however
usage of a semaphore will decrease throughput several times or so.

If you need threads to blocks then the only choice for lock-free
algorithms is an lightweight eventcount.

Unfortunately, this queue does not maintain own size. One option will
be to put separate counters along with the node pointers, and use
double word CAS to manipulate head pointer and counter. Here is crude
sketch which uses eventcounts for producer and consumer blocking (you
can think eventcounts as condition variables for lock-free algorithms,
they also allow threads to wait for some condition and to notify about
state change, however they do not require mutex and introduce minimum
overhead on fast-past (when no blocking occurs) ):

struct mpscq_t
{
struct mpscq_node_t* volatile head;
int pcount;
eventcount pec;
char pad [64];
struct mpscq_node_t* tail;
struct mpscq_node_t stub;
int ccount;
eventcount cec;
};

void mpscq_push(struct mpscq_t* self, struct mpscq_node_t* n)
{
n->next = 0;
pair<mpscq_node_t*, int> cmp = self->(head, pcount);
for (;;)
{
while (cmp.second - self->ccount >= queue_max_size)
{
// wait for size to decrease
eventcount_state st = pec.get();
if (cmp.second - self->ccount < queue_max_size)
break;
// blocking
pec.wait(st);
}
pair<mpscq_node_t*, int> xchg = (n, cmp.second + 1);
// increment pcount and swap head atomically
if (dword_compare_exchange(&self->head, cmp, xchg))
break;
}
cmp.first->next = n;
// notify consumer about new item if he waits
cec.notify_one();
}


// this is old non-blocking pop function
// note that I add some spin waits to it
struct mpscq_node_t* mpscq_pop_impl(struct mpscq_t* self)
{
struct mpscq_node_t* tail = self->tail;
struct mpscq_node_t* next = tail->next;
if (tail == &self->stub)
{
if (0 == next)
return 0;
self->tail = next;
tail = next;
next = next->next;
}
if (next)
{
self->tail = next;
return tail;
}
struct mpscq_node_t* head = self->head;
while (tail != head)
yield();
mpscq_push(self, &self->stub);
next = tail->next;
while (next == 0)
yield();
self->tail = next;
return tail;
}

// and this is blocking wrapper
struct mpscq_node_t* mpscq_pop(struct mpscq_t* self)
{
mpscq_node_t* n;
for (;;)
{
n = mpscq_pop_impl(self);
if (n)
break;
eventcount_state st = cec.get();
n = mpscq_pop_impl(self);
if (n)
break;
// blocking
cec.wait(st);
}
ccount += 1;
// notify producers about size decrease, if they are waiting
pec.notify_one();
return n;
}


Other option will be to switch to another algorithm, which natively
supports size maintaining. For bounded queues the natural choice is no
use underlying fixed-size array as storage, thus no need to manage
nodes.


--------------------------------------------------------

Some notes on your implementation.
First of all, you have to add padding basically everywhere. Proper
definition of my queue will be:
struct mpscq_t
{
// vars accessed by producers
char pad [cacheline_size];
// vars accessed by consumer
};

Then, when you allocate queues and thread args you have to allocate
them as:

struct queue_wrapper
{
mpscq_t queue;
char pad [cacheline_size];
};

so that consumer data of one queue does not situate on single
cacheline with producer data of another queue.
Or just add padding to the end of mpscq_t and thread_args structures
itself.

Then, since queue is especially designed as intrusive it's benifical
to unite queuenode_data and mpscq_node_t into one structure, thus less
memory allocations/deallocations, less indirections, less cache lines
touched.

Then, you better implement per-thread message caches. Enqueue and
dequeue operations are designed as extremely lightweight, but if you
call malloc and free (which very likely include mutexes) for every
message, then this totally destroys the point.
You may consider 2 main variants. First, when thread receives a
message he puts it to own private message cache, when this thread will
be sending a message he will get it from cache. Second, when thread
receives and processes a message he sends it back to producer.
First variant is preferrable performance-wise, however may expose odd
behavior in some situations. Assume thread only receives messages and
does not send anything, his private cache will blowup.
In practice some combination of both usually used. I.e. thread caches
messages, if cache reaches some limit all messages sent back to
producers in batches (one batch per producer). There are also many
optimizations, like double buffering (i.e. thread always leaves some
messages in private cache). Or NUMA-aware optimizations, for example
thread caches messages which came from it's node, but sent back
messages which came from remote nodes.

Try to implement those optimization one-by-one and check out what
performance improvement each gives.

As further reading I suggest:
Node allocator for FIFO queues:
http://groups.google.com/group/lock-free/browse_frm/thread/2c6f6250762385e1#

Eventcount description and implementation:
http://software.intel.com/en-us/forums/intel-threading-building-blocks/topic/62364/

Scalable memory allocation:
http://software.intel.com/en-us/forums/threading-on-intel-parallel-architectures/topic/61145/

Memory ordering in modern microprocessors:
http://www.linuxjournal.com/article/8211
http://www.linuxjournal.com/article/8212

--
Best regards,
Dmitriy V'jukov

Raine

unread,
Jun 1, 2009, 6:18:35 PM6/1/09
to Scalable Synchronization Algorithms
Hi Dimitry and Chris. I just want to thank you so much for this great
help! It's too much information for discuss right now, so if you don't
mind I will take a couple of weeks to take a deep look and then come
back with some results.

I really appreciate such help. It's kind difficult to receive such a
good and comprehensive explanation. Thank you so much for your time
and precious help!

In a couple of week I'll be coming back with more questions to discuss
but now in a more pratical way ;)

Best regards,
Raine.

PD: Have you ever considered writing a book about this topic (Scalable
lock-free algorithms)? I do think that a lot of people that is
starting on this kind of paradigm should be very interested in reading
such good explanations!
Reply all
Reply to author
Forward
0 new messages