Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool

362 views
Skip to first unread message

Alan Kent

unread,
Aug 13, 2007, 3:34:53 AM8/13/07
to java...@googlegroups.com
Surely I am missing something... but I have not found a "how to" yet
with Google.

I would like a pool of threads where a producer puts work items on the
queue, and there are a pool of threads pulling things off and doing
them. Standard sort of producer/consumer pattern I would have thought.
The work items can take between 10ms and 10minutes to complete.
java.util.concurrent is tempting, but I cannot work out how to make it
work for me.

I first off tried an unbounded queue. But it quickly used up all memory
when the producer out-ran the consumer threads.

I then tried a bounded queue, but by being bounded it means it throws an
exception if you give it too much work to do. (It does not block the
producer it terms of making it wait - it blocks the producer by throwing
an exception!)

There is a rejection policy you can use which is to let the producer do
the work if all threads in the pool are busy. But this is not ideal.
The work items are of different sizes, so if a large work item comes
along and the producer does the work itself, all the other threads in
the pool will run out of work.

I do like the way the java.util.concurrent thread pool stuff nicely
shuts down threads etc when idle. However, not being able to do a
simple producer/consumer pattern with the library just seems bizarre.
Surely I am just not understanding how to use it??!! Otherwise I guess
I roll my own code (which is not hard - just trying to avoid it).

Am I missing something obvious?
Thanks!
Alan

Bret Mcguire

unread,
Aug 13, 2007, 4:44:37 PM8/13/07
to The Java Posse
Have you tried one of the BlockingQueue implementations... maybe
LinkedBlockingQueue?

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/LinkedBlockingQueue.html

put() and take() block under the expected conditions.

- Bret -

Alan Kent

unread,
Aug 13, 2007, 8:34:16 PM8/13/07
to java...@googlegroups.com
Bret Mcguire wrote:
> Have you tried one of the BlockingQueue implementations... maybe
> LinkedBlockingQueue?
>
> http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/LinkedBlockingQueue.html
>
> put() and take() block under the expected conditions.
>

Thanks, but the queue is used internally by the thread pool - you don't
get to chose what the thread pool class (ThreadPoolExecutor) calls. The
thread pool calls the offer() method which is non-blocking.

I tried defining a subclass of ArrayBlockingQueue which made offer()
call put() to block, but the end result was only a single thread would
run - very strange. It never created more than one thread. As soon as
I put it back, multiple threads would run. Looking in the
ArrayBlockingQueue source it looked fine, so I suspect the problem is in
the ThreadPoolExecutor code - it just is not designed that way.

It took me a while to realize that "blocking" in the
java.util.concurrent terminology meant "stop it from happening by
throwing an exception", not cause the caller to wait until resources
were free.

I can go off and write my own thread pool - I was just wondering first
if I had overlooked something obvious.

Thanks!
Alan

Alexey

unread,
Aug 14, 2007, 1:02:58 PM8/14/07
to The Java Posse
On Aug 13, 8:34 pm, Alan Kent <ALAN.J.K...@saic.com> wrote:
> Bret Mcguire wrote:
> > Have you tried one of the BlockingQueue implementations... maybe
> > LinkedBlockingQueue?
>
> >http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/LinkedBl...

>
> > put() and take() block under the expected conditions.
>
> Thanks, but the queue is used internally by the thread pool - you don't
> get to chose what the thread pool class (ThreadPoolExecutor) calls. The
> thread pool calls the offer() method which is non-blocking.
>
> I tried defining a subclass of ArrayBlockingQueue which made offer()
> call put() to block, but the end result was only a single thread would
> run - very strange. It never created more than one thread. As soon as
> I put it back, multiple threads would run. Looking in the
> ArrayBlockingQueue source it looked fine, so I suspect the problem is in
> the ThreadPoolExecutor code - it just is not designed that way.
>
> It took me a while to realize that "blocking" in the
> java.util.concurrent terminology meant "stop it from happening by
> throwing an exception", not cause the caller to wait until resources
> were free.
>
> I can go off and write my own thread pool - I was just wondering first
> if I had overlooked something obvious.
>
> Thanks!
> Alan

A long long time ago, I implemented something like what you need. I
then isolated into its own library along with some other utility
classes I liked to reuse. Here's the main bit of code. If you'd like
the rest, I'd be happy to send you the whole thing. You could easily
rip out the object pooling references and I think that should be
enough to let it run on its own. I haven't used it in a long time,
but it did run properly in a project, so I have reasonably high
expectations that it should work.

package zinger.util;

import java.util.*;

import zinger.util.recycling.*;

public class ThreadLimiter
{
public static class RunnableException extends RuntimeException
{
private final Exception runnableException;

public RunnableException(final Exception runnableException)
{
this.runnableException = runnableException;
}

public Exception getException()
{
return this.runnableException;
}
}

protected static final ObjectGenerator COUNTER_GENERATOR = new
ArrayGenerator(int.class, 1)
{
public boolean prepareObject(final Object obj, final Object
arg) throws IllegalArgumentException
{
try
{
((int[])obj)[0] = 0;
return true;
}
catch(final ClassCastException ex)
{
throw new IllegalArgumentException();
}
}
};

protected final int maxThreadCount;
protected final long threadTimeout;
protected final int maxThreadWaits;

protected final Map threads = new HashMap();
protected final ObjectRecycler counterRecycler;

public ThreadLimiter(final int maxThreadCount, final long
threadTimeout, final int maxThreadWaits)
{
this.maxThreadCount = maxThreadCount;
this.threadTimeout = threadTimeout;
this.maxThreadWaits = maxThreadWaits;

this.counterRecycler = new
CappedObjectRecycler(ThreadLimiter.COUNTER_GENERATOR,
this.maxThreadCount);
}

protected String getCategoryName()
{
return this.getClass().getName();
}

public final void run(final Runnable toRun) throws
TimeoutException, InterruptedException, Exception
{
final Thread thread = Thread.currentThread();
int[] threadCounter;
synchronized(this.threads)
{
threadCounter = (int[])this.threads.get(thread);
if(threadCounter == null)
{
for(int i = 0; this.threads.size() >= maxThreadCount; +
+i)
{
if(i >= this.maxThreadWaits)
{
throw new TimeoutException();
}
this.threads.wait(this.threadTimeout);
}
threadCounter =
(int[])this.counterRecycler.getObject();
this.threads.put(thread, threadCounter);
}
}
++threadCounter[0];
try
{
toRun.run();
}
catch(final ThreadLimiter.RunnableException ex)
{
throw ex.getException();
}
finally
{
if((--threadCounter[0]) <= 0)
{
synchronized(this.threads)
{
this.threads.remove(thread);
this.threads.notify();
}
}
}
}

public StringBuffer status(StringBuffer sb)
{
if(sb == null)
{
sb = new StringBuffer();
}

sb.append("maxThreadCount
\t").append(this.maxThreadCount).append('\n');

sb.append("threadTimeout
\t").append(this.threadTimeout).append('\n');

sb.append("maxThreadWaits
\t").append(this.maxThreadWaits).append('\n');

sb.append("active threads:");
synchronized(this.threads)
{
if(this.threads.isEmpty())
{
sb.append(" none\n");
}
else
{
sb.append('\n');
Map.Entry entry;
for(final Iterator iterator =
this.threads.entrySet().iterator(); iterator.hasNext();)
{
entry = (Map.Entry)iterator.next();
sb.append(entry.getKey())
.append(" (")
.append(((int[])entry.getValue())[0])
.append(")\n");
}
}
}

return sb;
}
}

elpablo

unread,
Aug 17, 2007, 9:02:27 AM8/17/07
to The Java Posse
Try wrapping the ThreadPoolExecutor with your own class that has a
submitTask method that acquires a permit from a semaphore before
submitting the task and then releases the semaphore when it is done.
Use the same value for the executor's pool size and the amount of
permits available from the semaphore and you have your own blocking
implementation.

You will probably need to wrap the task in another runnable inside the
submitTask method and call run yourself in order to have a run method
that can see the semaphore to release the permit.

Hope that makes sense...

Paul

John Wright

unread,
Aug 22, 2007, 3:09:12 AM8/22/07
to The Java Posse
If you haven't already read Brian Goetz's latest book about
concurrency I can definitely recommend it - it will help with your
understanding.

We do something similar to this with a LinkedBlockingQueue. We have a
number of threads (manually created at the moment) that add items to a
BlockingQueue. A separate thread takes items off the queue and
processes them.

The Java threaded pool routines are just a way of creating threads.
You are correct in thinking that its up to you to build a framework
for this - it won't be hard.

I agree with you that this sort of Producer/Consumer is so prevalent
that it should have been in the Java 5 libraries. Even Brian Goetz's
book doesn't really give you a complete solution for this.

There are a number of scenarios.

1. Multiple threads add items to a BlockingQueue and a single thread
takes off items
2. A Single thread adds items to a BlockinqQueue and multiple threads
take off items
3. A Single thread adds items to a BlockQueue and another thread takes
off items.

Arguably these are all variants on a core pattern but there are
implementation reasons why you might want to treat them separately.

We have done something with item 1 and it works very well indeed. We
used Generics to create a class to handle this, it also defines JMX
methods to enable the queue to be monitored. I can't post the code
though because I work for a corporate!

On Aug 13, 8:34 am, Alan Kent <ALAN.J.K...@saic.com> wrote:

Reply all
Reply to author
Forward
0 new messages