Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Seeking help with a MT design pattern

1 view
Skip to first unread message

sam

unread,
Dec 31, 2009, 1:44:07 AM12/31/09
to
Greetings
I am a newbie and require your valuable inputs,
I have a queue of 1000 work items and a n-proc machine (assume n =
4).The main thread spawns n (=4) worker threads at a time ( 25 outer
iterations) and waits for all threads to complete before processing
the next n (=4) items until the entire queue is processed

for(i= 0 to queue.Length / numprocs)
for(j= 0 to numprocs)
{

CreateThread(WorkerThread,WorkItem)

}
WaitForMultipleObjects(threadHandle[])


The work done by each (worker) thread is not homogeneous.Therefore in
1 batch (of n) if thread 1 spends 1000 s doing work and rest of the 3
threads only 1 s , above design is inefficient,becaue after 1 sec
other 3 processors are idling. Besides there is no pooling - 1000
distinct threads are being created

How do I use the NT thread pool (I am not familiar enough- hence the
long winded question) and QueueUserWorkitem to achieve the above. The
following constraints should hold

1.The main thread requires that all worker items are processed before
it can proceed.So I would think that a waitall like construct above
is required
2.I want to create as many threads as processors (ie not 1000 threads
at a time)
3.Also I dont want to create 1000 distinct events, pass to the worker
thread, and wait on all events using the QueueUserWorkitem API or
otherwise
4.Exisitng code is in C++.Prefer C++ because I dont know c#

I suspect that the above is a very common pattern and was looking for
input from you folks
SamG

gleaves@hotmail.com > <hugh<underbar>

unread,
Dec 31, 2009, 10:18:01 AM12/31/09
to
Do or will any of the spawned threads EVER enter a wait state (sleep, event
wait, IO wait etc)?

Does every thread execute some identical, processor bound operation?

Do any of the threads EVER acces or modify some shared data?

Can the queued "work" items be executed in ANY order or must some work items
precede some others (at all, to any degree)?

Is the "queue" fixed, I mean once we begin to spawn threads, can new items
appear or do we simply: queue 1000 items, wait for all to be processed, then
carry on?

Do spawned threads EVER create new work items in the queue?

Having answers to these is important.

Cap'n

"sam" wrote:

> .
>

sam

unread,
Dec 31, 2009, 10:52:09 AM12/31/09
to
Thanks - reply inline

> Do or will any of the spawned threads EVER enter a wait state (sleep, event
> wait, IO wait etc)?

No


>
> Does every thread execute some identical, processor bound operation?

The operation is identical but the payload to each thread is
different- hence the operation takes time

>
> Do any of the threads EVER acces or modify some shared data?

No


>
> Can the queued "work" items be executed in ANY order or must some work items
> precede some others (at all, to any degree)?

> Yes - every task is independent

> Is the "queue" fixed, I mean once we begin to spawn threads, can new items
> appear or do we simply: queue 1000 items, wait for all to be processed, then
> carry on?

The queue is fixed

>
> Do spawned threads EVER create new work items in the queue?

> No none

> > .- Hide quoted text -
>
> - Show quoted text -

Le Chaud Lapin

unread,
Dec 31, 2009, 1:11:06 PM12/31/09
to
On Dec 31, 12:44 am, sam <samsm...@gmail.com> wrote:
> Greetings
> I am a newbie and require your valuable inputs,
> I have a queue of 1000 work items and a n-proc machine (assume n =
> 4).The main thread spawns n (=4)  worker threads at a time ( 25 outer
> iterations) and waits for all threads to complete before processing
> the next n (=4) items until the entire queue is processed

[snippage]

> I suspect that the above is a very common pattern and was looking for
> input from you folks

There are several ways you could do this, but a simple algorithm might
be something like:

1. Main thread wraps 1000 work items in a queue by both a Mutex and a
Semaphore: Mutex'ed<Semaphore'd<Queue<WorkItem>>>.
2. Main thread raises Semaphore to 1000.
2. Main thread spawns 4 worker threads.
3. Each of 4 worker threads does:
a. WaitForSingleObject(Semaphore) on queue to see if it contains
work items. If no more work items, exits. [*]
b. Acquire queue.
c. Remove work item from queue.
d. Release queue.
e. Process work item.
f. goto a.

4. Main thread does a WaitForMultipleObjects(all) against 4 thread
handles.

[*] Undoubtedly, you have already considered the necessity of forcing
abrupt-but-graceful abort of a worker thread, perhaps at a point
between processing work items. In this case, for step (a), you would
use WaitForMultipleObjects against not just the Semaphore, but also a
companion Event that is supplied and set by the main thread when it
wants a worker thread to abort. So there would be 4 Semaphore's, 4
Events.

Obviously, you would have no more than 4 worker threads spawned at any
time, and hence 4 thread handles, and each worker thread would be
saturated with work until time to exit, with option for abortion.

Happy New Year!

-Le Chaud Lapin-

Le Chaud Lapin

unread,
Dec 31, 2009, 1:36:38 PM12/31/09
to
On Dec 31, 12:11 pm, Le Chaud Lapin <jaibudu...@gmail.com> wrote:
> On Dec 31, 12:44 am, sam <samsm...@gmail.com> wrote:
> 1. Main thread wraps 1000 work items in a queue by both a Mutex and a
> Semaphore: Mutex'ed<Semaphore'd<Queue<WorkItem>>>.
> 2. Main thread raises Semaphore to 1000.
> 2. Main thread spawns 4 worker threads.
> 3. Each of 4 worker threads does:
>    a. WaitForSingleObject(Semaphore) on queue to see if it contains
> work items. If no more work items, exits. [*]
>    b. Acquire queue.
>    c. Remove work item from queue.
>    d. Release queue.
>    e. Process work item.
>    f. goto a.
>
> 4. Main thread does a WaitForMultipleObjects(all) against 4 thread
> handles.
>
> [*] Undoubtedly, you have already considered the necessity of forcing
> abrupt-but-graceful abort of a worker thread, perhaps at a point
> between processing work items. In this case, for step (a), you would
> use WaitForMultipleObjects against not just the Semaphore, but also a
> companion Event that is supplied and set by the main thread when it
> wants a worker thread to abort. So there would be 4 Semaphore's, 4
> Events.

If you want all threads to exit abruptly, and do not care about
individual threads, then you could use 1 (manual-reset) event instead
of 4.

[Bring the Bubbly!]

-Le Chaud Lapin-

Chris M. Thomasson

unread,
Dec 31, 2009, 4:12:09 PM12/31/09
to
"sam" <sams...@gmail.com> wrote in message
news:1090dee3-16d9-41f8...@m26g2000yqb.googlegroups.com...

> Greetings
> I am a newbie and require your valuable inputs,
> I have a queue of 1000 work items and a n-proc machine (assume n =
> 4).The main thread spawns n (=4) worker threads at a time ( 25 outer
> iterations) and waits for all threads to complete before processing
> the next n (=4) items until the entire queue is processed
>
> for(i= 0 to queue.Length / numprocs)
> for(j= 0 to numprocs)
> {
>
> CreateThread(WorkerThread,WorkItem)
>
> }
> WaitForMultipleObjects(threadHandle[])
>
>
> The work done by each (worker) thread is not homogeneous.Therefore in
> 1 batch (of n) if thread 1 spends 1000 s doing work and rest of the 3
> threads only 1 s , above design is inefficient,becaue after 1 sec
> other 3 processors are idling. Besides there is no pooling - 1000
> distinct threads are being created

[...]

The simplest design is a single queue, a thread-pool and a future. The main
thread pushes work. Each work item has pointer to future. The future gets
signaled when all the work is finished. A simple sketch of the design could
look like:


<pseudo-code typed in news reader, sorry for any typos!>
_____________________________________________________________________
struct future
{
LONG m_count; // = 1
HANDLE m_waitset; // auto-reset event


void reset(LONG count)
{
// add 1 in order to compensate the waiter thread.
m_count = count + 1;
}


void wait()
{
if (InterlockedDecrement(&m_count))
{
WaitForSingleObject(m_waitset, INFINITE);
}
}


void complete()
{
if (! InterlockedDecrement(&m_count))
{
SetEvent(m_waitset);
}
}
};


struct node
{
node* m_next;
};


struct queue
{
node* m_head; // = NULL
node* m_tail;
CRITICAL_SECTION m_mutex;
HANDLE m_waitset; // manual-reset event


void push(node* head, node* tail)
{
tail->m_next = NULL;

EnterCriticalSection(&m_mutex);

if (! m_head)
{
m_head = head;
SetEvent(m_waitset);
}

else
{
m_tail->m_next = head;
}

m_tail = tail;

LeaveCriticalSection(&m_mutex);
}


node* pop()
{

retry:

EnterCriticalSection(&m_mutex);

node* n = m_head;

if (! n)
{
LeaveCriticalSection(&m_mutex);

WaitForSingleObject(m_waitset, INFINITE);

goto retry;
}

if (! (m_head = n->m_next))
{
ResetEvent(m_waitset);
}

LeaveCriticalSection(&m_mutex);

return n;
}
};


struct work : public node
{
future* m_future;
work* m_local_next;


void process();
};


static queue g_queue;


struct worker_thread
{
void entry()
{
for (;;)
{
work* w = (work*)g_queue.pop();

w->process();

w->m_future->complete();
}
}
};


struct main_thread
{
work* m_spawn_head; // = NULL
work* m_spawn_tail;
future m_future;


void entry()
{
for (;;)
{
// setup future for 1000 work items.
m_future.reset(1000);


// create 1000 work items.
for (size_t i = 0; i < 1000; ++i)
{
work* w = new work;

w->m_future = &m_future;
w->m_local_next = NULL;

if (! m_spawn_head)
{
m_spawn_head = w;
}

else
{
m_spawn_tail->m_local_next = w;
}

m_spawn_tail = w;
}


// spawn all 1000 work items in a single shot.
g_queue.push(m_spawn_head, m_spawn_tail);


// wait for everything.
m_future.wait();


// destroy all spawned work items.
while (m_spawn_head)
{
work* next = m_spawn_head->m_local_next;
delete m_spawn_head;
m_spawn_head = next;
}

m_spawn_tail = NULL;
}
}
};
_____________________________________________________________________


That's an extremely simple design, which should fit you're needs.

Chris M. Thomasson

unread,
Dec 31, 2009, 4:27:26 PM12/31/09
to
"Le Chaud Lapin" <jaibu...@gmail.com> wrote in message
news:eb899d03-d902-4292...@a15g2000yqm.googlegroups.com...

On Dec 31, 12:11 pm, Le Chaud Lapin <jaibudu...@gmail.com> wrote:
> On Dec 31, 12:44 am, sam <samsm...@gmail.com> wrote:
> 1. Main thread wraps 1000 work items in a queue by both a Mutex and a
> Semaphore: Mutex'ed<Semaphore'd<Queue<WorkItem>>>.
> 2. Main thread raises Semaphore to 1000.
> 2. Main thread spawns 4 worker threads.

Why have the main thread spawn 4 worker thread's every time it needs to
process 1000 work items? Just create a thread pool and use futures.


> 3. Each of 4 worker threads does:
> a. WaitForSingleObject(Semaphore) on queue to see if it contains
> work items. If no more work items, exits. [*]

There is absolutely no need for a semaphore here. Think about it for a
moment. Hint, the 1000 items are already in the queue, and the queue can
tell you when it is empty.


> b. Acquire queue.
> c. Remove work item from queue.
> d. Release queue.
> e. Process work item.
> f. goto a.
>
> 4. Main thread does a WaitForMultipleObjects(all) against 4 thread
> handles.
>
> [*] Undoubtedly, you have already considered the necessity of forcing
> abrupt-but-graceful abort of a worker thread, perhaps at a point
> between processing work items. In this case, for step (a), you would
> use WaitForMultipleObjects against not just the Semaphore, but also a
> companion Event that is supplied and set by the main thread when it
> wants a worker thread to abort. So there would be 4 Semaphore's, 4
> Events.

You don't actually need any of that. In fact, all's you "need" for you're
specific example is a thread-safe queue, an auto-reset event, a cancel
variable and a future variable.

0 new messages