Hi all!
Since I'm really new to this *-free stuff { OK...I'm a noob :_( } and
since I've been following many posts
related on lock-free data structures on comp.programming.threads from
Mr. Dimitry V'jukov,
Chris Thomasson and other people too, I've really been interested in
trying to use lock-free algorithms
on my university project (basically is a asynchronous message queue
service bus). I've been making
some rude tests (in C) on a multi-producer/single-consumer (mpsc) fifo
queue obtained from
this post:
http://groups.google.com/group/comp.programming.threads/browse_thread/thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526
and which it's posted at the end of this post (please excuse/forgive
any bad coding - it's just a rude test...).
Since then, I've have accumulating a lot of doubts and, unfortunately,
couldn't find helpful advice.
So why not asking where those ideas came from ;)
I'll really appreciate any help on those questions I'll post above
here:
1) Based on the test program above, the only way (beside implementing
my self those calls)
I've found on making XCHG primitive workable was to use GNG gcc built-
in atomic ops, and since in projects like Innodb
(MySQL db engine) they've made theire own atomic primitives, I've
wondering if this is
'a just fine' way for performance. I know that that should not be a
concern right now
(that Knuth's phrase: ... 'premature optimization is the root of all
evil'), but since I'm curious,
why not just ask to people who dealed about this. Do you know any
other good implementation
of XCHG, instead gcc compiler built-in or even instead Linux own
implementation ('asm/atomic.h')?
2) Looking at my code do you think if I want to make a message queue
do I have to worry about memory alignment issues over
performance costs or the way I'm implementing could be just fine?
Any tip or advice on this? I personally really messes with alignment
stuff :-D
3) And the for optimizing cache affinity in this case? Any advice
besides forcing threads X and Y to work
fixed on different cores?
4) Is it possible to implement a stack/LIFO using that algorithm from
Mr. Dimitry V'jukov
(
http://groups.google.com/group/comp.programming.threads/browse_thread/
thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526) ?
And a circular buffer?
5) Since that algorithm is really awesome (at least comparing with a
traditional
multi-producer/multi-consumer lock-based queue see
http://www.aagh.net/projects/thrqueue)
it would be a bad idea using locks/condvars/semaphores to make a
bounded fifo.
You can see in the program above that the queue is unbounded, and
memory get really busted
due to producers enqueueing more that consumer can dequeue, and since
no producer sleeps if any
queue limit is reached, this could be a bad idea on avoiding hogging
system resources...
So, here is my idea:
"Use some sort of notification mechanism so producers can 'poll' that
event-base and continuing
enqueueing more data. The responsible to acknowledge a ready-to-
produce queue would be the consumer."
I don't know that this should be a bloated idea or just using simple
semaphores or condvars could be
just fine... What do you think about this? Any other idea?
Thanks again for the help, I really stuck in those topics, and don't
know how to proceed in my project...
Here is the working program:
/
********************************************************************************************************************
*
* Basically it creates N-mpsc queues fed by 'PRODUCERS' producers
threads in a round-robin fashion
* and consumed (each separate queue by a independent consumer).
* 'CONSUMERS' param will define how many queues should be created and
thus how many consumer
* threads will be consuming each independent queue.
* Throughput is considered as how much a producer thread can enqueue
per second and how much a
* consumer can dequeue per second.
* Also producers calculates random number and accumulate it to
compare with same accumulated
* value on consumer side (it's a rude way to check consistency). Also
insertion ordering is check
* on consumer side.
*
*/
#define _GNU_SOURCE
#define _XOPEN_SOURCE 600
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/times.h>
#include <sys/vtimes.h>
#include <math.h>
#include <pthread.h>
/*
* From GNU compiler 4.1.2+ has builtin atomic ops - See:
*
http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
*/
#if ( (__GNUC__ == 4) && (__GNUC_MINOR__ >= 1) || __GNUC__ > 4) && \
(defined(__x86_64__) || defined(__i386__))
#define XCHG __sync_lock_test_and_set
#endif
#define CONSUMERS 1
#define PRODUCERS 4
#define PRODUCERS_ITERATIONS 1000000 /*0 = forever*/
#define SLEEP_BTW_ENQUEUE 0 /*0 = don't
sleep*/
#define DEBUG_PRODUCER 0
#define DEBUG_CONSUMER 0
//#define CALCLATENCY
struct queuenode_data
{
char thr_id[100];
int ithid;
double dvalue;
unsigned long qdid;
#ifdef CALCLATENCY
struct timeval tv;
#endif
};
struct thread_args
{
char thr_id[100];
int ithid;
double dvalue;
unsigned long counts;
double throughput;
};
/* ------------------------------------------------ */
/*
* Extracted from
*
http://groups.google.com/group/comp.programming.threads/browse_thread/thread/33f79c75146582f3/6227ec9a1d741526?tvc=2#6227ec9a1d741526
* Author: Dmitriy V'jukov
*/
struct mpscq_node_t
{
struct queuenode_data *qndata;
struct mpscq_node_t* volatile next;
};
struct mpscq_t
{
struct mpscq_node_t* volatile head;
struct mpscq_node_t* tail;
struct mpscq_node_t stub;
};
#define MPSCQ_STATIC_INIT(self) {&self.stub, &self.stub, {0}}
void mpscq_create(struct mpscq_t* self)
{
self->head = &self->stub;
self->tail = &self->stub;
self->stub.next = 0;
}
void mpscq_push(struct mpscq_t* self, struct mpscq_node_t* n)
{
n->next = 0;
struct mpscq_node_t* prev = XCHG(&self->head, n);
//(*)
prev->next = n;
}
struct mpscq_node_t* mpscq_pop(struct mpscq_t* self)
{
struct mpscq_node_t* tail = self->tail;
struct mpscq_node_t* next = tail->next;
if (tail == &self->stub)
{
if (0 == next)
return 0;
self->tail = next;
tail = next;
next = next->next;
}
if (next)
{
self->tail = next;
return tail;
}
struct mpscq_node_t* head = self->head;
if (tail != head)
return 0;
mpscq_push(self, &self->stub);
next = tail->next;
if (next)
{
self->tail = next;
return tail;
}
return 0;
}
/* ------------------------------------------------ */
uint allproducersgone = 0;
/* Queue */
struct mpscq_t **queues;
static struct thread_args **thargs_prodpool;
static struct thread_args **thargs_conspool;
struct mpscq_node_t* mpscq_create_node(struct queuenode_data *qndata)
{
struct mpscq_node_t *qn = (struct mpscq_node_t*) malloc(sizeof
(struct mpscq_node_t));
qn->qndata = qndata;
return qn;
}
static double now()
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + (double) tv.tv_usec / 1000000;
}
static double now_d(struct timeval *tv)
{
return tv->tv_sec + (double) tv->tv_usec / 1000000;
}
static struct timeval* now_tv()
{
struct timeval *tv = (struct timeval*) malloc(sizeof(struct
timeval));
gettimeofday(tv, NULL);
return tv;
}
static unsigned long diff_usecs(struct timeval *tvs,struct timeval
*tvf){
return (tvf->tv_sec - tvs->tv_sec) * 1000000 + (tvf->tv_usec - tvs-
>tv_usec);
}
static unsigned long diff_secs(struct timeval *tvs,struct timeval *tvf)
{
return (tvf->tv_sec - tvs->tv_sec);
}
void producer_runtime(void *_tharg)
{
double dvalue=0.0;
double throughput;
unsigned long counts=0,elapsed;
long int rdv;
struct thread_args *thargs = (struct thread_args *)_tharg;
struct queuenode_data *qndata;
struct mpscq_node_t *node;
struct timeval start_tv,end_tv;
struct mpscq_t *queue;
printf("%s: thread alive [producer_runtime]\n",thargs->thr_id);
/* Start time counter */
gettimeofday(&start_tv, NULL);
while(counts < PRODUCERS_ITERATIONS || PRODUCERS_ITERATIONS == 0)
{
/* Round-robin queue */
queue = queues[counts % CONSUMERS];
/* Create and initialize cell */
qndata = (struct queuenode_data *) malloc(sizeof(struct
queuenode_data));
qndata->qdid = counts+1;
rdv = random()%1000;
dvalue += rdv;
qndata->dvalue = rdv;
#ifdef CALCLATENCY
gettimeofday(&(qndata->tv), NULL);
#endif
strcpy(qndata->thr_id,thargs->thr_id);
qndata->ithid = thargs->ithid;
/* Create and set queue node */
node = mpscq_create_node(qndata);
/* Enqueue new node */
mpscq_push(queue,node);
/* Log enqueuing */
if(DEBUG_PRODUCER > 0) printf("%s: enqueue at %lu\n",thargs-
>thr_id, counts);
counts++;
if(SLEEP_BTW_ENQUEUE != 0) usleep(SLEEP_BTW_ENQUEUE);
}
/* End time counter */
gettimeofday(&end_tv, NULL);
thargs->dvalue = dvalue;
elapsed = diff_secs(&start_tv,&end_tv);
if(elapsed > 0) throughput = counts / elapsed ;
thargs->counts = counts;
thargs->throughput = throughput;
printf("%s: thread exit. Total %lu in %lu secs. Throughput: %.2f
dvalue: %.2f\n",thargs->thr_id,counts,elapsed,throughput,dvalue);
}
void consumer_runtime(void *_tharg)
{
struct thread_args *thargs = (struct thread_args *)_tharg;
struct mpscq_node_t *node;
struct timeval tvnow;
struct timeval start_tv,end_tv;
unsigned long counts=0,elapsed;
double throughput;
double minlatency=(double)
INFINITY,maxlatency=0.0,avglatency=0.0,accumlatency=0.0;
double dvalues[PRODUCERS];
unsigned long qdids[PRODUCERS];
unsigned long outoforder=0;
struct mpscq_t *queue = queues[thargs->ithid];
printf("%s: thread alive [consumer_runtime]\n",thargs->thr_id);
/* Start time counter */
gettimeofday(&start_tv, NULL);
while(1)
{
/* Dequeue (consume) node */
node = mpscq_pop(queue);
if(node != NULL)
{
#ifdef CALCLATENCY
gettimeofday(&tvnow, NULL);
elapsed = diff_usecs(&(node->qndata->tv),&tvnow);
if(elapsed > maxlatency) maxlatency = elapsed;
else if(elapsed < minlatency) minlatency = elapsed;
accumlatency += (double) elapsed;
if(DEBUG_CONSUMER > 0) printf("%s: dequeued %s at %.2f -> %lu
usecs\n",thargs->thr_id, node->qndata->thr_id, now_d(&tvnow),
elapsed);
#endif
counts++;
dvalues[node->qndata->ithid] += node->qndata->dvalue;
if(node->qndata->qdid <= qdids[node->qndata->ithid])
{
outoforder++;
printf("%s: Out of order: %lu from producer %s\n",thargs-
>thr_id,node->qndata->qdid,node->qndata->thr_id);
}
else
{
qdids[node->qndata->ithid] = node->qndata->qdid;
}
/* Free resources */
free(node->qndata); node->qndata = NULL;
free(node); node = NULL;
}
else
{
if(allproducersgone == 0)
{
if(DEBUG_CONSUMER > 0) printf("%s: nothing to dequeue
\n",thargs->thr_id);
//usleep(1000);
}else break;
}
}
/* End time counter */
gettimeofday(&end_tv, NULL);
elapsed = diff_secs(&start_tv,&end_tv);
if(elapsed > 0) throughput = counts / elapsed ;
thargs->counts = counts;
avglatency = accumlatency / counts;
thargs->throughput = throughput;
printf("%s: thread exit. Consumed: %lu in %lu secs Throughput: %.2f
[min: %.2f , max: %.2f , avg: %.2f] usecs\n",thargs-
>thr_id,counts,elapsed,throughput,minlatency,maxlatency,avglatency);
int i=0;
for(i=0;i < PRODUCERS; i++)
{
printf("%s->P%d dvalue: %.2f\n",thargs->thr_id,i,dvalues[i]);
}
printf("%s: Total out of order: %lu\n",thargs->thr_id,outoforder);
}
int main()
{
int i=0;
unsigned long total_prod=0,total_cons=0;
double total_throughput_prod=0.0,total_throughput_cons=0.0;
struct thread_args *thargs;
pthread_t th_producers[PRODUCERS], th_consumers[CONSUMERS];
thargs_prodpool = (struct thread_args **) malloc(sizeof(struct
thread_args) * PRODUCERS);
thargs_conspool = (struct thread_args **) malloc(sizeof(struct
thread_args) * CONSUMERS);
/* Create and initialize global queues array */
queues = (struct mpscq_t **) malloc(sizeof(struct mpscq_t) *
CONSUMERS);
for(i=0; i < CONSUMERS ; i++)
{
queues[i] = (struct mpscq_t *) malloc(sizeof(struct mpscq_t));
mpscq_create(queues[i]);
}
/* Spawn producer threads */
for(i = 0; i < PRODUCERS; i++)
{
/*Initialize thread arg*/
thargs = (struct thread_args *) malloc(sizeof(struct
thread_args));
thargs_prodpool[i] = thargs;
sprintf(thargs->thr_id,"P%d",i);
thargs->ithid = i;
pthread_create(&th_producers[i], NULL, producer_runtime, thargs);
}
/* Spawn consumers thread */
for(i = 0; i < CONSUMERS; i++)
{
thargs = (struct thread_args *) malloc(sizeof(struct
thread_args));
thargs_conspool[i] = thargs;
sprintf(thargs->thr_id,"C%d",i);
thargs->ithid = i;
pthread_create(&th_consumers[i], NULL, consumer_runtime, thargs);
}
/* Join all threads */
for(i = 0; i < PRODUCERS; i++)
{
pthread_join(th_producers[i], NULL);
}
allproducersgone = 1;
for(i = 0; i < CONSUMERS; i++)
{
pthread_join(th_consumers[i], NULL);
}
/* Get stats */
for(i = 0; i < PRODUCERS; i++)
{
total_prod += thargs_prodpool[i]->counts;
total_throughput_prod += thargs_prodpool[i]->throughput;
}
for(i = 0; i < CONSUMERS; i++)
{
total_cons += thargs_conspool[i]->counts;
total_throughput_cons += thargs_conspool[i]->throughput;
}
printf("\nTotal items produced: %lu , Total items consumed: %lu
\n",total_prod,total_cons);
printf("Total throughput produced: %.2f , Total throughput consumed:
%.2f\n\n",total_throughput_prod,total_throughput_cons);
printf("Bye!\n");
fsync(stdout);
return 0;
}
/
*****************************************************************************************************************************/
Thanks again for all your help!
Best regards,
Raine