Best practices for determining when we're finished processing fifo entries.

57 views
Skip to first unread message

Michael Grunder

unread,
Aug 19, 2017, 11:13:23 PM8/19/17
to Concurrency Kit

Hi,

First off, Concurrency Kit is fantastic. I was able to get up and running with it quickly and so far it’s worked flawlessly!

At present my workflow looks something like this:

  • Iterate over input data and enqueue tasks
  • Start worker threads to process the tasks
  • join my threads and package up the results

My thread routine is quite simple now, and looks like this:

/* Threaded worker */
void *pfilterseqWorker(void *p) {
    pfsThreadArg *wa = (pfsThreadArg*)p;
    pqueryargs *qa = wa->query;
    ck_fifo_mpmc_entry_t *stub;
    pworkunit *work;

    /* Pull jobs until we're done */
    while (ck_fifo_mpmc_dequeue(wa->queue, &work, &stub) != false) {
        doSomethingWithWork(work->id);
        pworkunitFree(work);
    }

    return NULL;
}

I think there it would be possible to squeeze a bit more performance out of the process if I were to start my worker threads before enqueuing jobs but am unsure about how to go about that in my thread routine.  If I start the threads before adding tasks to the queue then I'll be expecting dequeue operations to fail and need to handle that.

Is something like the following the right way to handle dequeuing where the workers start before I start enqueuing tasks?

/* Global state variables */
static unsigned int qfinished;
static unsigned int qdone;
static unsigned int qtotal;
// PRODUCER

/* Initialize state variables */
qfinished = 0;
qdone = 0;
qtotal = 0;

/* Initialize queue */
ck_fifo_mpmc_init(&queue, &stub);

/* Set up thread arguments and start threads */
for (i = 0; i < server.pset_query_workers; i++) {
    workarg[i].queue = &queue;
    pthread_create(&thread[i], NULL, pfilterseqWorker, &workarg[i]);
}

/* Add work to the queue */
for (i = 0; i < count; i++) {
    /* Enqueue unit of work */
    entry = malloc(sizeof(ck_fifo_mpmc_t));
    work = malloc(sizeof(*work));
    work->id = i; /* Contrived payload */
    ck_fifo_mpmc_enqueue(&queue, entry, work);

    /* Increment the total job count */
    qtotal++;
}

/* Flag that we're finished */
qfinished = 1;
/* Consumer thread routine*/
void *pfilterseqWorker(void *p) {
    pfsThreadArg *wa = (pfsThreadArg*)p;
    pqueryargs *qa = wa->query;
    ck_fifo_mpmc_entry_t *stub;
    pworkunit *work;

    /* Loop while we've not finished adding jobs or until done < total */
    while (!qfinished || qdone < qtotal ) {
        if (ck_fifo_mpmc_trydequeue(wa->queue, &work, &stub) == false) {
            ck_pr_stall();
            continue;
        }

        doSomethingWithWork(work->id);
        pworkunitFree(work);
        ck_pr_inc_uint(&qdone);
    }

    return NULL;
}
Reply all
Reply to author
Forward
0 new messages