On Aug 13, 8:34 pm, Alan Kent <ALAN.J.K...@saic.com> wrote:
> Bret Mcguire wrote:
> > Have you tried one of the BlockingQueue implementations... maybe
> > LinkedBlockingQueue?
> >http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/LinkedBl...
> > put() and take() block under the expected conditions.
> Thanks, but the queue is used internally by the thread pool - you don't
> get to chose what the thread pool class (ThreadPoolExecutor) calls. The
> thread pool calls the offer() method which is non-blocking.
> I tried defining a subclass of ArrayBlockingQueue which made offer()
> call put() to block, but the end result was only a single thread would
> run - very strange. It never created more than one thread. As soon as
> I put it back, multiple threads would run. Looking in the
> ArrayBlockingQueue source it looked fine, so I suspect the problem is in
> the ThreadPoolExecutor code - it just is not designed that way.
> It took me a while to realize that "blocking" in the
> java.util.concurrent terminology meant "stop it from happening by
> throwing an exception", not cause the caller to wait until resources
> were free.
> I can go off and write my own thread pool - I was just wondering first
> if I had overlooked something obvious.
> Thanks!
> Alan
A long long time ago, I implemented something like what you need. I
then isolated into its own library along with some other utility
classes I liked to reuse. Here's the main bit of code. If you'd like
the rest, I'd be happy to send you the whole thing. You could easily
rip out the object pooling references and I think that should be
enough to let it run on its own. I haven't used it in a long time,
but it did run properly in a project, so I have reasonably high
expectations that it should work.
package zinger.util;
import java.util.*;
import zinger.util.recycling.*;
public class ThreadLimiter
{
public static class RunnableException extends RuntimeException
{
private final Exception runnableException;
public RunnableException(final Exception runnableException)
{
this.runnableException = runnableException;
}
public Exception getException()
{
return this.runnableException;
}
}
protected static final ObjectGenerator COUNTER_GENERATOR = new
ArrayGenerator(int.class, 1)
{
public boolean prepareObject(final Object obj, final Object
arg) throws IllegalArgumentException
{
try
{
((int[])obj)[0] = 0;
return true;
}
catch(final ClassCastException ex)
{
throw new IllegalArgumentException();
}
}
};
protected final int maxThreadCount;
protected final long threadTimeout;
protected final int maxThreadWaits;
protected final Map threads = new HashMap();
protected final ObjectRecycler counterRecycler;
public ThreadLimiter(final int maxThreadCount, final long
threadTimeout, final int maxThreadWaits)
{
this.maxThreadCount = maxThreadCount;
this.threadTimeout = threadTimeout;
this.maxThreadWaits = maxThreadWaits;
this.counterRecycler = new
CappedObjectRecycler(ThreadLimiter.COUNTER_GENERATOR,
this.maxThreadCount);
}
protected String getCategoryName()
{
return this.getClass().getName();
}
public final void run(final Runnable toRun) throws
TimeoutException, InterruptedException, Exception
{
final Thread thread = Thread.currentThread();
int[] threadCounter;
synchronized(this.threads)
{
threadCounter = (int[])this.threads.get(thread);
if(threadCounter == null)
{
for(int i = 0; this.threads.size() >= maxThreadCount; +
+i)
{
if(i >= this.maxThreadWaits)
{
throw new TimeoutException();
}
this.threads.wait(this.threadTimeout);
}
threadCounter =
(int[])this.counterRecycler.getObject();
this.threads.put(thread, threadCounter);
}
}
++threadCounter[0];
try
{
toRun.run();
}
catch(final ThreadLimiter.RunnableException ex)
{
throw ex.getException();
}
finally
{
if((--threadCounter[0]) <= 0)
{
synchronized(this.threads)
{
this.threads.remove(thread);
this.threads.notify();
}
}
}
}
public StringBuffer status(StringBuffer sb)
{
if(sb == null)
{
sb = new StringBuffer();
}
sb.append("maxThreadCount
\t").append(this.maxThreadCount).append('\n');
sb.append("threadTimeout
\t").append(this.threadTimeout).append('\n');
sb.append("maxThreadWaits
\t").append(this.maxThreadWaits).append('\n');
sb.append("active threads:");
synchronized(this.threads)
{
if(this.threads.isEmpty())
{
sb.append(" none\n");
}
else
{
sb.append('\n');
Map.Entry entry;
for(final Iterator iterator =
this.threads.entrySet().iterator(); iterator.hasNext();)
{
entry = (Map.Entry)iterator.next();
sb.append(entry.getKey())
.append(" (")
.append(((int[])entry.getValue())[0])
.append(")\n");
}
}
}
return sb;
}
}