Sincerely,
Stéphane Thibaud
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h> // needed for pthreads
#include <unistd.h> // for usleep()
#include "prodcons.h"
static unsigned int prodCount = 0; // The number of produces items
static unsigned int bufferHead = 0; // Indicates the first free place
in buffer or undefined when prodCount = BUFFER_SIZE
static unsigned int inBufferCnt = 0; // Number of items in buffer
static pthread_mutex_t bufferLock;
static pthread_cond_t bufferNotFull;
static pthread_cond_t availForCons[NROF_CONSUMERS]; // Conditions
variables for the availability of items on the belt (per consumer)
static const unsigned short int ALL_ONES = 0xFFFF;
static int producing = 1;
static void rsleep (int t);
static void * producer (/*void * arg*/)
{
ITEM item; // a produced item
while (prodCount < NROF_ITEMS) // While not all items are
produced
{
rsleep (PRODUCER_SLEEP_FACTOR);
item = (random() % NROF_CONSUMERS) + (prodCount <<
NROF_BITS_DEST);
prodCount++;
pthread_mutex_lock(&bufferLock);
printf("justlocked\n");
if(inBufferCnt == BUFFER_SIZE)
{
printf("full buffer\n");
pthread_cond_wait(&bufferNotFull, &bufferLock);
printf("not full anymore\n");
}
buffer[bufferHead] = item;
bufferHead = (bufferHead + 1) % BUFFER_SIZE;
inBufferCnt++;
if(inBufferCnt == 1)
{
printf("consumer signaled\n");
pthread_cond_signal(&availForCons[buffer
[bufferHead - 1] & ~
(ALL_ONES << NROF_BITS_DEST)]);
}
printf("%04x\n", item); // write info to stdout,
putting it inside
the CS ensures no two threads write to stdout at the same time
printf("unlockingnext\n");
pthread_mutex_unlock(&bufferLock);
}
producing = 0;
return NULL;
}
int main (/*int argc, char * argv[]*/)
{
int consumerCount = 0;
int consumerIter;
int exitcode = 0;
pthread_t producerTh;
pthread_t consumerTh[NROF_CONSUMERS];
// Initializations
srandom (time(NULL));
pthread_mutex_init(&bufferLock, NULL);
pthread_cond_init(&bufferNotFull, NULL);
for(consumerIter = 0; consumerIter < NROF_CONSUMERS; consumerIter++)
{
pthread_cond_init(&availForCons[consumerIter], NULL);
}
// Create a producer-thread and NROF_CONSUMERS consumer-threads
while(consumerCount < NROF_CONSUMERS && !pthread_create(&consumerTh
[consumerCount], NULL, consumer, (void *)consumerCount))
{
consumerCount++;
}
if(consumerCount != NROF_CONSUMERS || pthread_create(&producerTh,
NULL, producer, NULL))
{
printf("Some thread could not be created. There might have been
insufficient resources available. Now quitting...");
exitcode = -1;
}
// Wait until all threads are finished
for(consumerIter = 0; consumerIter < consumerCount; consumerIter++)
{
pthread_join(consumerTh[consumerIter], NULL);
}
pthread_join(producerTh, NULL);
// Close everything
pthread_mutex_destroy(&bufferLock);
pthread_cond_destroy(&bufferNotFull);
for(consumerIter = 0; consumerIter < NROF_CONSUMERS; consumerIter++)
{
pthread_cond_destroy(&availForCons[consumerIter]);
}
return (exitcode);
}
You can only block on one condition variable, but the actual condition
can check multiple things as long as they're all protected by the same
mutex and as long as you do a pthread_cond_signal() or
pthread_cond_broadcast() whenever any of the data changes.
Chris
Thanks!
[...]
FWIW, you can do a very simple bounded circular buffer like:
// pseudo-code typed in news reader.
___________________________________________________________________
#define DEPTH 8192
struct foo
{
char blah[128];
};
struct data
{
int in_use;
struct foo foo;
};
struct cbuf
{
struct data buffer[DEPTH]; /* = { { 0 } } */
size_t head; /* = 0 */
size_t tail; /* = 0 */
pthread_mutex_t mutex;
pthread_cond_t cond;
};
void
cbuf_push(struct cbuf* self,
struct foo const* f)
{
size_t i;
pthread_mutex_lock(&self->mutex);
i = (self->head++) % DEPTH;
while (self->buffer[i].in_use)
{
pthread_cond_wait(&self->cond, &self->mutex);
}
self->buffer[i].foo = *f;
self->buffer[i].in_use = 1;
pthread_mutex_unlock(&self->mutex);
pthread_cond_broadcast(&self->cond);
}
void
cbuf_pop(struct cbuf* self,
struct foo* f)
{
size_t i;
pthread_mutex_lock(&self->mutex);
i = (self->tail++) % DEPTH;
while (! self->buffer[i].in_use)
{
pthread_cond_wait(&self->cond, &self->mutex);
}
*f = self->buffer[i].foo;
self->buffer[i].in_use = 0;
pthread_mutex_unlock(&self->mutex);
pthread_cond_broadcast(&self->cond);
}
___________________________________________________________________
There ya go.
If you use a single condition variable for multiple program conditions
(predicates), you should ALWAYS use pthread_cond_broadcast(); otherwise
you may wake (only) a thread waiting for a different condition. It might
correctly re-check its predicate and go back to sleep, but that may
become a "lost wakeup" from the perspective of threads waiting for the
condition that actually did change. (Trying to "chain" wakeups by having
a "spuriously awakened" thread re-signal before blocking doesn't help,
because there's no guarantee that re-signal will wake the proper thread.)
The proper way to think of signal vs broadcast is that "broadcast" is
the normal default. "Signal" is an OPTIMIZATION appropriate for the
specific case where you know that waking ONLY ONE THREAD, and ANY THREAD
waiting on the condition variable is sufficient response to the change
in predicate. If more than one thread may need to respond, or if there
might possibly be even a single thread waiting that cannot respond, you
MUST use broadcast.
Excellent point. Thanks.
Chris