Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

ThreadPool Questions

2 views
Skip to first unread message

reeset

unread,
Aug 3, 2007, 12:26:01 PM8/3/07
to
I'm hoping someone can help me understand why running a set of processes
through the threadpool seems to be slower than running the same process
through a single thread (maybe I'm minunderstanding what the threadpool class
is good for). Let me example what I need to do:

Basically, I have a file that has n number of records in it. There could be
40,000 records to 4 million records. Each record requires its own
calculations, and while the records themselves are not dependant on data from
any other record, the results must be printed back to the results file in the
same order that they were processed.

I've written the following (see below) that works -- however the code is
almost twice as slow as when the process is run within a single thread. I'm
just wondering why I'm seeing what I'm seeing or is there a better way to do
this.

Thank you,
--TR

Sample code below:

using System;
using System.Collections.Generic;

namespace thread_tes
{
class MainClass
{
public static void Main(string[] args)
{
System.Threading.ThreadPool.SetMaxThreads(5, 5);
System.Threading.ManualResetEvent[] doEvents = new
System.Threading.ManualResetEvent[5];

System.IO.StreamReader reader = new
System.IO.StreamReader(@"c:\thread.txt",
System.Text.Encoding.GetEncoding(1252));
System.IO.StreamWriter writer = new
System.IO.StreamWriter(@"c:\thread1.txt", false);


RunClass[] r = new RunClass[5];
int max = r.Length;
int i = 0;



while (reader.Peek() > -1)
{
string s = reader.ReadLine();

doEvents[i] = new
System.Threading.ManualResetEvent(false);
r[i] = new RunClass(s, doEvents[i]);


//r[i] = f;
System.Threading.ThreadPool.QueueUserWorkItem(r[i].Run,
i);

i++;
if (i == max) {
System.Threading.WaitHandle.WaitAll(doEvents);
PrintThreads(writer, r, i);
i =0;
}
}

if (i > 0) {
System.Threading.WaitHandle.WaitAll(doEvents);
Console.WriteLine("New String: " + r[0].NewString);
PrintThreads(writer, r, i);
}
writer.Close();
Console.WriteLine("Finished");
Console.ReadLine();
}


static void PrintThreads(System.IO.StreamWriter writer, RunClass[]
r, int i)
{
Console.WriteLine("PrintThreads Executed");
for (int x=0; x < i; x++) {
if (r[x] != null)
{
writer.WriteLine(r[x].NewString);
}
}
}

}

class RunClass
{

private string _p = "";
private string _Internal = "";
private System.Threading.ManualResetEvent _doEvent = new
System.Threading.ManualResetEvent(false);

public RunClass(string p, System.Threading.ManualResetEvent _event) {
_p = p;
_doEvent = _event;
}

public string NewString
{
set {_Internal = value;}
get {return _Internal;}
}

public void Run(object status)
{
int i = 0;
i = System.Convert.ToInt32(_p);
i++;
NewString = i.ToString();
_doEvent.Set();
}
}
}

Peter Duniho

unread,
Aug 3, 2007, 1:03:08 PM8/3/07
to
reeset wrote:
> I'm hoping someone can help me understand why running a set of processes
> through the threadpool seems to be slower than running the same process
> through a single thread (maybe I'm minunderstanding what the threadpool class
> is good for). Let me example what I need to do:
>
> Basically, I have a file that has n number of records in it. There could be
> 40,000 records to 4 million records. Each record requires its own
> calculations, and while the records themselves are not dependant on data from
> any other record, the results must be printed back to the results file in the
> same order that they were processed.
>
> I've written the following (see below) that works -- however the code is
> almost twice as slow as when the process is run within a single thread. I'm
> just wondering why I'm seeing what I'm seeing or is there a better way to do
> this.

I see a few things.

The first is that you are not measuring the actual work you intend to
do. You've created what appears to be some sort of "busy work",
probably just for the purpose of testing the speed, but since the
thread-switching costs are the same regardless, if your "busy work"
isn't exactly the same cost as the actual work, your results aren't a
valid indicator as to how the actual work will do.

The second, and IMHO the most important is that you are queuing a new
work item for each record you want to process. Because there is some
cost to switching to a new thread and doing the various inter-thread
communication (in your case, dealing with the WaitHandle), the larger a
proportion that work is to your total effort, the less efficient things
will be. A more correct design would use a single queued work item to
handle _all_ of the records.

As a related note to this second point, note that your queued work items
are not guaranteed to execute in the same order in which they were
queued. This may or may not matter; in the example you posted, it
appears not to, but since the example is a contrived "busy work"
example, your real situation could be problematic. For example, if you
add the results of your computations to some sort of list once they are
finished, that list could wind up out of order relative to the order in
which the records were read.

Finally, a relatively minor point is that your implementation
periodically waits for the last N (where N = 5 in your case) queued work
items to complete before starting any new ones. This means that if
there were any advantage to parallelization between the threads, you
negate some of that advantage by forcing everything to synchronize every
N items. Even if all this means is that you have more than one CPU and
one of those CPUs winds up idle when it doesn't have to be, that's going
to affect your results.

If you need to wait for some collection of queued work items to
complete, there are better ways to do that. For example, use a shared
counter that is incremented for each new work item, and then which is
decremented as each work item completes. The main thread would sit in a
loop controlled by some synchronization object (like a WaitHandle or a
Monitor instance) that can allow it to wake up each time the counter is
decremented and continue once the counter reaches 0 again.

Of course, since in this case you really ought to just be queuing a
single work item, due to the relatively low cost of processing
iteration, this third issue goes away. But it's something you should
keep in mind in the future.

Pete

Samuel R. Neff

unread,
Aug 3, 2007, 2:00:24 PM8/3/07
to

First, you need to determine if the calculations themselves are taking
most of the time in the loop (the non-threaded version). If the
calculations are only a small part of the loop and most of the time
required is for reading and writing files, then multithreading will
not help.

So if the calculations really are the bottleneck, then multithreading
is good but you want less synchronization between threads. The whole
point is to enable threads to perform things at their own pace. Due
to the required ordering on output, you do need some synchronization
to make sure the data comes out in the right order, but not nearly as
much as what you have.

I would suggest a produce/consumer architecture like this:

1 thread - reading thread (Main() entrypoint)
x threads - worker threads
1 thread - sync/writing thread

You should either have a single synchronized queue or one queue for
each worker thread (still requires synchronization, but will have less
contention).

You sould have an output data pool. I'm not sure what the best
collection to use but perhaps a dictionary is best. Output data also
needs to be synchronized.

The reader thread can read items and then put them into the queue(s).
They need to be tagged with their sort order, ideally sequential
numbers starting with zero.

Worker threads all continuously take data out of the queue(s), perform
the calculations, a put results into the output pool.

The writer thread has to take stuff out of the pool but also make sure
it does so in the right order. If you have a single sequential
number, then you can just do a simple loop and block when the next
needed value is not ready.

The important performance issue is the synchronization points. You
want to limit the amount of time threads are waiting on each other.
You can do this by having individual threads read/write a few values
each time they get a lock or by separating collections so there are
less objects interacting with each collection (although there will
always be at least two.. the reader or writer, as well as at least one
worker thread).

Once you have a working model, you can tweak the number of worker
threads to find the ideal number for your situation (can even increase
the number dynamically based on load).

HTH,

Sam


------------------------------------------------------------
We're hiring! B-Line Medical is seeking .NET
Developers for exciting positions in medical product
development in MD/DC. Work with a variety of technologies
in a relaxed team environment. See ads on Dice.com.

Peter Duniho

unread,
Aug 3, 2007, 2:46:12 PM8/3/07
to
Samuel R. Neff wrote:
> First, you need to determine if the calculations themselves are taking
> most of the time in the loop (the non-threaded version). If the
> calculations are only a small part of the loop and most of the time
> required is for reading and writing files, then multithreading will
> not help.

This isn't really true. In fact, one significant benefit of a
multi-threaded design is that if one thread is waiting on i/o, other
threads can still be active doing things, rather than forcing everything
to work in sequence (as in "wait on i/o, process data, wait on i/o,
process data", which is less efficient than "process data while waiting
on i/o, process data while waiting on i/o").

Now, that does to some extent assume i/o calls that are mutually
exclusive of each other. But even there, not completely as having
multiple outstanding i/o requests can allow the OS to do more
intelligent access of the hardware device and caching of the i/o
results, even when the i/o calls are effectively sequential. The OS is
usually pretty good at dealing with sequential i/o anyway, but there is
a theoretical benefit even so.

Multi-threading benefits are not limited to things that are CPU-bound.

Now, will multi-threading help the particular example of code? It's
hard to say without knowing exactly what the guy really wants to do. In
many cases, the primary benefit of multi-threaded code is simply to
allow some sort of processing to occur without disabling the user
interface. That is, a performance benefit isn't the goal at all. In
those situations, looking at the effect on performance isn't likely to
be relevant (other than, of course, ensuring that going to a
multi-threaded design hasn't killed your performance).

Pete

reeset

unread,
Aug 3, 2007, 5:28:02 PM8/3/07
to
> The first is that you are not measuring the actual work you intend to
> do. You've created what appears to be some sort of "busy work",
> probably just for the purpose of testing the speed, but since the
> thread-switching costs are the same regardless, if your "busy work"
> isn't exactly the same cost as the actual work, your results aren't a
> valid indicator as to how the actual work will do.

So, you are right, this is just a busy work to show how I setup the code. I
reality, the processing that needs to occur is on an encoded record string.
These strings can be up to 1 MB in size, and require a fair amount of data
processing before the results can be written into the output file. However,
as you note, the records are of different sizes, so record processing will
vary. Using a single thread approach, the program can process roughly 10,000
data records per second. Taking the posted code and adapting it to the
program, that number drops, and drops proportiately to the number of records
being processed. I think you're analysis regarding the need for a single
work queue is the correct one. The problem that I run into is I'm not quite
sure how I will accomplish this. Say for example I process the first 5
records in the file. The order in which these processes run and finish
really isn't that important. However, what is important is that the data is
outputed back into the same order that it was read. So if I read record 1-5,
the data must be re-written back in record 1-5 order.

I'd considered using a shared counter -- but that would just let me know
when a "set" was finished processing -- how would you maintain the initial
processing order (not the order in which they were completed)?

Thanks,

--TR

reeset

unread,
Aug 3, 2007, 5:32:02 PM8/3/07
to
I can definitely tell you that IO is insignificant. The record processing
itself makes up the lionshare of the "work" being done by the application.
It was part of the reason why I'd thought placing the items in a threadpool
would provide some optimizations.

> You should either have a single synchronized queue or one queue for
> each worker thread (still requires synchronization, but will have less
> contention).
>
> You sould have an output data pool. I'm not sure what the best
> collection to use but perhaps a dictionary is best. Output data also
> needs to be synchronized.
>
> The reader thread can read items and then put them into the queue(s).
> They need to be tagged with their sort order, ideally sequential
> numbers starting with zero.
>
> Worker threads all continuously take data out of the queue(s), perform
> the calculations, a put results into the output pool.
>
> The writer thread has to take stuff out of the pool but also make sure
> it does so in the right order. If you have a single sequential
> number, then you can just do a simple loop and block when the next
> needed value is not ready.

Thanks, I think I can visualize what you are talking about. I'll see where
this takes me.

--TR

Peter Duniho

unread,
Aug 3, 2007, 6:11:28 PM8/3/07
to
reeset wrote:
> [...]

> Say for example I process the first 5
> records in the file. The order in which these processes run and finish
> really isn't that important. However, what is important is that the data is
> outputed back into the same order that it was read. So if I read record 1-5,
> the data must be re-written back in record 1-5 order.
>
> I'd considered using a shared counter -- but that would just let me know
> when a "set" was finished processing -- how would you maintain the initial
> processing order (not the order in which they were completed)?

(Actually, the shared counter is intended to allow you to know when
_all_ of the processing has been completed, not just one set; you
haven't explained the need for processing data in "sets", so I haven't
made an attempt to include that behavior in the discussion)

I think it would be helpful to step back and state some goals. What is
it that you expect to gain by multi-threading this code?

The issues you're asking about are solvable (and without much difficulty
even), but at the moment I'm still unclear as to what the point in
assigning each individual record its own thread for processing is.

Are you trying to improve performance? If so, how does multi-threading
help that? Are you simply trying to improve the user experience? If
so, isn't it feasible to simply move the entire processing loop into a
single thread for that purpose, avoiding the need to do any additional
work to keep the processed data in order?

Maintaining the processed data in original order is as simple as
ordering the output data structures before processing and keeping them
in order even while you process individual items. But before you go to
some trouble to create the data structure that will maintain that order,
you should first be sure that you have a specific need of a solution
that requires such a data structure.

Until we understand better what goal you're trying to achieve, that
specific need is not apparent.

Pete

reeset

unread,
Aug 3, 2007, 7:48:00 PM8/3/07
to
> I think it would be helpful to step back and state some goals. What is
> it that you expect to gain by multi-threading this code?

I can tell you what I hope to gain -- I'm looking for a performance
increase. The process works fine as it is -- but I'm starting to deal with
multiple gigabyte files where the process use to take a handful of minutes is
now taking 15-20 to process. The goal with attempting to thread the data
processing step was to find a way to reduce the processing time. Since
there's no gui involved, the user experience is less of an issue.

> (Actually, the shared counter is intended to allow you to know when
> _all_ of the processing has been completed, not just one set; you
> haven't explained the need for processing data in "sets", so I haven't
> made an attempt to include that behavior in the discussion)

Well, the need is an artificial one, but one that I have to acommodiate.
Essentially, users will expect the record order in the output file to match
the order of input file for no other reason than that's how it's always been
done. Ultimately, both individuals and the tools that can work with these
datasets will need to make use of these output files will need to process the
data sequentially so preserving order is unfortunately necessary.

Thanks,

--TR


Rick Lones

unread,
Aug 3, 2007, 11:18:20 PM8/3/07
to
reeset wrote:
> I can definitely tell you that IO is insignificant. The record processing
> itself makes up the lionshare of the "work" being done by the application.
> It was part of the reason why I'd thought placing the items in a threadpool
> would provide some optimizations.
>

If you are that CPU-bound then I would think that you will gain no parallelism
from multithreading at all, in fact the main effect will be to introduce the
overhead of managing the threads themselves. It sounds as if adding processors
would be your only hope for significant performance improvement. Then,
threading can help as you would be concurrently running threads on separate
processors. And in that case, I would still think that having more active
threads than the number of processors would likely be counterproductive (for the
same reason).

JMHO,
-rick-

Peter Duniho

unread,
Aug 3, 2007, 11:45:28 PM8/3/07
to
reeset wrote:
>> I think it would be helpful to step back and state some goals. What is
>> it that you expect to gain by multi-threading this code?
>
> I can tell you what I hope to gain -- I'm looking for a performance
> increase. The process works fine as it is -- but I'm starting to deal with
> multiple gigabyte files where the process use to take a handful of minutes is
> now taking 15-20 to process. The goal with attempting to thread the data
> processing step was to find a way to reduce the processing time. Since
> there's no gui involved, the user experience is less of an issue.

Okay, so the goal is improved performance. The next question is: in
what way is your processing parallelizable? That is, why is it that
using multiple threads _should_ improve performance?

If you are computationally bound (that is, it takes longer to process
the data than to read it), _and_ you have more than one CPU to use (real
CPUs, not Hyperthreading), then additional threads should help for sure.

Alternatively, if you can read the data from multiple sources at once
and can use threading to parallelize the data retrieval, that could
improve performance as well. In this case however, it's possible
(probable, even) that you could achieve the same improvement without
explicitly using the thread pool, and instead simply using the
asynchronous i/o methods (Stream.BeginRead(), for example).

More theoretically, there is some possibility that you could even
eliminate (or nearly so) the processing cost, if you are i/o bound.
That is, you could put the processing in one thread and the i/o in
another, with the processing thread pulling data from a queue the i/o
thread feeds, dealing with it as fast as, and in parallel with, the i/o
thread.

You wouldn't go any faster than the i/o restricts you to, but at least
you would not have the cost of the processing added to the cost of the i/o.

In all of these scenarios, there's an upper limit to how many threads
would actually be useful. In the CPU-bound case, it won't do you much
good to have more threads than you have CPUs. In the i/o-bound cases,
it will depend on the exact scenario. The latter design, in which i/o
and processing are done in separate threads, would only require two
threads, for example, and there'd be no benefit from additional threads.

Note that in none of those cases have I suggested that you assign a new
thread per record. I don't believe that design would be _better_
performance-wise than something above, and because of the thread context
switching, it could be much worse, depending on other costs.

If you believe that assigning a new worker queue item to each record, as
in the design you've posted here, would improve performance then it
would be helpful for you to explain why it is you feel that should
improve performance. I don't think it should, but maybe there's
something I'm missing regarding the overall design that explains that.

>> (Actually, the shared counter is intended to allow you to know when
>> _all_ of the processing has been completed, not just one set; you
>> haven't explained the need for processing data in "sets", so I haven't
>> made an attempt to include that behavior in the discussion)
>
> Well, the need is an artificial one, but one that I have to acommodiate.

What is the need for processing the data in sets? That is, the code you
posted processes five at a time, halting all further processing until a
single set of five has completed.

> Essentially, users will expect the record order in the output file to match
> the order of input file for no other reason than that's how it's always been
> done. Ultimately, both individuals and the tools that can work with these
> datasets will need to make use of these output files will need to process the
> data sequentially so preserving order is unfortunately necessary.

The requirement that the data remain in order isn't surprising, and
isn't related to my question about doing the processing in sets.
Keeping the records in order won't be hard, no matter what you do, but
that's not the question I was asking.

Pete

Peter Duniho

unread,
Aug 4, 2007, 12:16:55 AM8/4/07
to
Sorry...I posted my much more general reply before seeing this post.

Anyway...

reeset wrote:
> I can definitely tell you that IO is insignificant.

So, having a thread for each CPU should help. As I mentioned, more
threads than that won't.

This is very important. If you are running this code on a computer with
only one CPU, and your code is truly CPU-bound, then more threads are
only going to make things worse.

For the rest of this post, let's assume you've got extra CPUs you can use.

> The record processing
> itself makes up the lionshare of the "work" being done by the application.
> It was part of the reason why I'd thought placing the items in a threadpool
> would provide some optimizations.

It could. What design you'll have to use depends on what the actual
cost per-record for processing is. If each record takes seconds to
process, then I think queuing a work item for each record is a very
easy, workable solution.

You will have ordering issues, which are easily addressed. You will
also want to take out the part of your code that stops every five
records and waits for the other threads to catch up. Doing that negates
much of the benefit of multi-threading. Finally, you will want to limit
the thread pool size to have no more active threads than you have CPUs.

If each record processed takes something like 500 to 1000 milliseconds,
then you're in a bit of a gray area. You _should_ see a benefit even if
you queue one worker thread item per record, but your overhead will
start to reduce your efficiency in a noticeable way.

If each record processed only takes tens of milliseconds or less, then
thread scheduling and context switching could easily offset whatever
gain you have from parallelizing the work, if you are only assigning a
single record to each worker queue item.

Now, with all that out of the way...the design Samuel suggests seems
like it should work reasonably well for you. Specifically, having a
thread for i/o, feeding a queue that multiple worker threads can
consume, should work nicely and prevents any need to create new worker
items. You just start the worker threads and let them continue as long
as there is data to process. This avoids having to worry about the
exact relative cost of processing each record.

The one modification to his suggestion that I'd make is this: ordering
the data is a lot easier than it seems that he's suggesting. The data
can be kept in some kind of ordered list from the outset, left there in
order the whole time. Here's a pseudo-code description of what I mean,
using Samuel's design as the framework:

Reader thread:

while not end of data:
read record
add record to ordered list
pulse a monitor that the workers wait on

Worker thread:

while not end of data:
scan forward through the ordered list to find first unprocessed record
if the end of the list is reached, wait on the monitor and then
scan again
process the record, mark it as having been processed
pulse a monitor that the consumer waits on

Writing thread:

while not end of data:
scan forward through the ordered list, handling records until
reaching an unprocessed record or the end of the list
wait on the monitor and then scan again

In this way, the records always remain in the correct order. There is a
slight inefficiency in that the consumer thread will occasionally have
to wait at an unprocessed record even though another record later in the
sequence has already been done, but assuming that the processing itself
is really the bottleneck, this shouldn't really be a problem. IMHO, the
simpler design more than makes up for any minor delays there (and of
course the delay is only really noticed if it occurs at the very end of
the processing sequence, since otherwise the consumer thread should
easily catch up at the next opportunity).

You could use a LinkedList or List<> collection to keep the data ordered
according to the input order of the records. Just add new data to the
end of the collection. The LinkedList would be more efficient if you
want to remove items from the front of the list as they are finally
handled by the consumer.

You might be tempted to use a couple of Queues, but the need to keep the
records in the same order as they are initially read would make that
impractical, as you'd have to create some sort of intermediate data
structure to ensure that processed records are added to the consumer's
Queue in the same order in which they were removed from the reading
thread's queue.

Other than that change, I think Samuel's suggestion may in fact be ideal
for your situation.

Pete

Peter Duniho

unread,
Aug 4, 2007, 5:42:30 AM8/4/07
to
Peter Duniho wrote:
> [...]

> Now, with all that out of the way...the design Samuel suggests seems
> like it should work reasonably well for you. Specifically, having a
> thread for i/o, feeding a queue that multiple worker threads can
> consume, should work nicely and prevents any need to create new worker
> items. You just start the worker threads and let them continue as long
> as there is data to process. This avoids having to worry about the
> exact relative cost of processing each record.
>
> The one modification to his suggestion that I'd make is this: ordering
> the data is a lot easier than it seems that he's suggesting. The data
> can be kept in some kind of ordered list from the outset, left there in
> order the whole time. Here's a pseudo-code description of what I mean,
> using Samuel's design as the framework:

Bah. Forget the pseudo code (well, not really...it's fine as far as it
goes). Your problem created an itch I couldn't help scratching, and I
coded up an example of what I meant.

I've copied the code into this post. It's not the prettiest sample code
around, so here's some comments about it to help orient you:

-- It's a console application, so you should be able to copy it verbatim
into a new console application project and have it work fine.

-- class WorkItem is the class that represents a single instance of
work. In your case, this would be a single record of data. In my
sample code, there's a DoWork method in this class that actually
represents the simulated work. I have two versions of timing: one uses
Sleep() and the other just spins until the time is up. The latter is
useful for simulating actual behavior given the CPUs actually installed
on your computer, while the former is useful for simulating behavior
when you always have as many CPUs as you have threads (since the Sleep
method doesn't actually consume CPU time, the threads don't really
compete for the CPU and essentially always complete the work in
parallel, at least given a reasonable number of threads).

Comment one or the other line out, depending on what you want to simulate.

-- class WorkQueue is the class that actually manages the work. It
allows WorkItem instances to be enqueued, to be dequeued for processing,
and dequeued for whatever final purpose once the processing for a given
item has finished. The "trick", if you will, is in the WiFinishedNext
method, which is what encodes the logic to preserve the observable order
of the completed records even if they are processed out of order.

In this implementation, a null WorkItem at the end of the queue serves
as an indicator to the other threads that they have reached the actual
end of the processing.

-- class Worker is the class that represents a single thread. It just
sits and pulls new WorkItem instances from the WorkQueue and simulates
doing some work with the WorkItem by calling the WorkItem instance's
DoWork method. Each worker thread created will use an instance of this
class.

-- class Reader is the class that represents the part of the program
that deals with handling the finished data. Like the Worker class, it
just sits and pulls items from the queue using the appropriate method on
the queue.

-- the Main method just creates the queue, starts up the threads, and
then creates a bunch of work item instances with randomly chosen times
for processing, adding each to the queue. It then simply waits for the
reader thread to finish, and prints out the total time spent working.

I threw in a bunch of console output lines so that you can watch what is
going on as the code executes. IMHO, the two most interesting things to
note as the program runs are that the worker threads don't necessarily
complete in the same order they started (suggestive of the fact that
throughout the processing work items are not always completed in
order...though, because of the way thread scheduling works, the console
output is not actually 100% proof of this), and that when using more
than one thread, often multiple "Finished..." messages are output to the
console all at once (this does represent the situation in which some
work items were out of order, but still processed in sequence by the
reader).

And of course, most importantly, the total time spent processing is
roughly inversely proportional to the number of threads assigned to do
work, up to a number of threads equal to the number of CPUs (or
simulated CPUs, if you use the sleeping version of WorkItem.DoWork).

Note that this is just an example. There may be a number of things that
you would do differently in real code. For example, there's not really
any requirement that the WorkItem class contain the actual code to
process data. You could just as easily make that a data-only class or
struct, and put all of the processing logic somewhere else (like in the
Worker class). The key is to be able to put something into the queue
that stores not only the data that needs to be processed, but a flag
indicating that the data has been processed.

Another example is my use of the Monitor class. In particular, I use
PulseAll and have the same locking object shared between the worker
threads and the reader thread. I did this because it's simpler and
seemed less likely to result in a bug, but it does mean that every time
something happens that a worker is interested in, the reader also wakes
up to look, and likewise every time something happens that the reader is
interested in, _all_ of the workers wake up to look. In some cases,
this results in wasted thread context switching, as one or more threads
wake up only to find that there's nothing for them to do.

However, the code is simpler this way (simple-but-slower is always
better than wrong-but-faster), and since the worker threads really
shouldn't normally be waiting anyway (once the queue starts filling up,
they should be kept busy and never hit the Wait anyway), the
inefficiency introduced is minimal.

But you might decide you want to use a second locking object to separate
the two, depending on your needs.

I hope that you find the sample useful.

Pete

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace TestThreadProcessor
{
class Program
{
class WorkItem
{
private int _iseq;
private TimeSpan _tsDuration;
private volatile bool _fDone;

public WorkItem(int iseq, TimeSpan tsDuration)
{
_iseq = iseq;
_tsDuration = tsDuration;
}

public int Sequence
{
get { return _iseq; }
}

public bool Done
{
get { return _fDone; }
}

public void DoWork()
{
DateTime dtStart = DateTime.Now;

Thread.Sleep((int)_tsDuration.TotalMilliseconds);
// while ((DateTime.Now - dtStart) < _tsDuration) ;

_fDone = true;
}
}

class WorkQueue
{
private LinkedList<WorkItem> _lwiToDo = new
LinkedList<WorkItem>();
private LinkedListNode<WorkItem> _nwiProcessNext;
private object _objLock = new object();

public void Enqueue(WorkItem wi)
{
lock (_objLock)
{
LinkedListNode<WorkItem> nwi = new
LinkedListNode<WorkItem>(wi);

if (_nwiProcessNext == null)
{
_nwiProcessNext = nwi;
}

_lwiToDo.AddLast(nwi);
Update();
}
}

public WorkItem WiProcessNext()
{
WorkItem wiRet;

lock (_objLock)
{
while (_nwiProcessNext == null)
{
Monitor.Wait(_objLock);
}
wiRet = _nwiProcessNext.Value;

if (wiRet != null)
{
_nwiProcessNext = _nwiProcessNext.Next;
}
}

return wiRet;
}

public WorkItem WiFinishedNext()
{
WorkItem wiRet;

lock (_objLock)
{
while (_lwiToDo.First == null ||
(_lwiToDo.First.Value != null &&
!_lwiToDo.First.Value.Done))
{
Monitor.Wait(_objLock);
}

wiRet = _lwiToDo.First.Value;

if (wiRet != null)
{
_lwiToDo.RemoveFirst();
}
}

return wiRet;
}

public void Update()
{
lock (_objLock)
{
Monitor.PulseAll(_objLock);
}
}
}

class Worker
{
private int _id;
private WorkQueue _wq;

public Worker(int id, WorkQueue wq)
{
_id = id;
_wq = wq;
}

public void Start()
{
WorkItem wi;

Console.WriteLine("Worker id#" + _id + " starting");

while ((wi = _wq.WiProcessNext()) != null)
{
wi.DoWork();
_wq.Update();
}

Console.WriteLine("Worker id #" + _id + " exiting");
}
}

class Reader
{
private WorkQueue _wq;

public Reader(WorkQueue wq)
{
_wq = wq;
}

public void Start()
{
WorkItem wi;

Console.WriteLine("Reader starting");

while ((wi = _wq.WiFinishedNext()) != null)
{
Console.WriteLine("Finished sequence #" + wi.Sequence);
}

Console.WriteLine("Reader exiting");
}
}

static int _cthread = 1;
static TimeSpan _tsWork = new TimeSpan(0, 0, 15);
static int _tickMin = 250;
static int _tickMax = 1250;

static bool ParseArgs(string[] args)
{
bool fRet = true;
List<string> lstrErrors = new List<string>(4);

switch (args.Length)
{
default:
goto case 4;
case 4:
{
int tick;

if (int.TryParse(args[3], out tick))
{
_tickMax = tick;
}
else
{
lstrErrors.Insert(0, "Invalid parameter
tickMax: \"" + args[3] + "\"");
fRet = false;
}
}
goto case 3;
case 3:
{
int tick;

if (int.TryParse(args[2], out tick))
{
_tickMin = tick;
}
else
{
lstrErrors.Insert(0, "Invalid parameter
tickMin: \"" + args[2] + "\"");
fRet = false;
}
}
goto case 2;
case 2:
{
TimeSpan ts;

if (TimeSpan.TryParse(args[1], out ts))
{
_tsWork = ts;
}
else
{
lstrErrors.Insert(0, "Invalid parameter
tsWork: \"" + args[1] + "\"");
fRet = false;
}
}
goto case 1;
case 1:
{
int cthread;

if (int.TryParse(args[0], out cthread))
{
_cthread = cthread;
}
else
{
lstrErrors.Insert(0, "Invalid parameter
cthread: \"" + args[0] + "\"");
fRet = false;
}
}
goto case 0;
case 0:
break;
}

foreach (string str in lstrErrors)
{
Console.WriteLine(str);
}

return fRet;
}

static void Main(string[] args)
{
if (!ParseArgs(args))
{
return;
}

Console.WriteLine("Threads: " + _cthread + ", total time: "
+ _tsWork.ToString() + ", minimum work item time: " + _tickMin + "ms,
maximum work item item: " + _tickMax + "ms");

int iwi = 0;
WorkQueue wq = new WorkQueue();
Random rnd = new Random();
TimeSpan tsLeft = _tsWork;
Stopwatch watch = new Stopwatch();
Thread threadReader = new Thread(new ThreadStart((new
Reader(wq)).Start));

for (int ithread = 0; ithread < _cthread; ithread++)
{
(new Thread(new ThreadStart((new Worker(ithread,
wq)).Start))).Start();
}

threadReader.Start();

watch.Start();
iwi = 0;
while (tsLeft > new TimeSpan())
{
TimeSpan tsItem = new TimeSpan(0, 0, 0, 0,
rnd.Next(_tickMin, _tickMax));
WorkItem wi = new WorkItem(iwi, tsItem);

wq.Enqueue(wi);

tsLeft -= tsItem;
iwi++;
}
wq.Enqueue(null);

Console.WriteLine("Added " + iwi + " work items");

threadReader.Join();
watch.Stop();

Console.WriteLine("Total time spent processing: " +
watch.Elapsed);
Console.ReadLine();
}
}
}

0 new messages