Gmail Calendar Documents Reader Web more »
Recently Visited Groups | Help | Sign in
Google Groups Home
Help using java.util.concurrent.ThreadPoo lExecutor for producer/consumer thread pool
There are currently too many topics in this group that display first. To make this topic appear first, remove this option from another topic.
There was an error processing your request. Please try again.
flag
  6 messages - Collapse all  -  Translate all to Translated (View all originals)
The group you are posting to is a Usenet group. Messages posted to this group will make your email address visible to anyone on the Internet.
Your reply message has not been sent.
Your post was successful
 
From:
To:
Cc:
Followup To:
Add Cc | Add Followup-to | Edit Subject
Subject:
Validation:
For verification purposes please type the characters you see in the picture below or the numbers you hear by clicking the accessibility icon. Listen and type the numbers you hear
 
Alan Kent  
View profile  
 More options Aug 13 2007, 3:34 am
From: Alan Kent <ALAN.J.K...@saic.com>
Date: Mon, 13 Aug 2007 17:34:53 +1000
Local: Mon, Aug 13 2007 3:34 am
Subject: Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool
Surely I am missing something... but I have not found a "how to" yet
with Google.

I would like a pool of threads where a producer puts work items on the
queue, and there are a pool of threads pulling things off and doing
them.  Standard sort of producer/consumer pattern I would have thought.  
The work items can take between 10ms and 10minutes to complete.  
java.util.concurrent is tempting, but I cannot work out how to make it
work for me.

I first off tried an unbounded queue.  But it quickly used up all memory
when the producer out-ran the consumer threads.

I then tried a bounded queue, but by being bounded it means it throws an
exception if you give it too much work to do.  (It does not block the
producer it terms of making it wait - it blocks the producer by throwing
an exception!)

There is a rejection policy you can use which is to let the producer do
the work if all threads in the pool are busy.  But this is not ideal.  
The work items are of different sizes, so if a large work item comes
along and the producer does the work itself, all the other threads in
the pool will run out of work.

I do like the way the java.util.concurrent thread pool stuff nicely
shuts down threads etc when idle.  However, not being able to do a
simple producer/consumer pattern with the library just seems bizarre.  
Surely I am just not understanding how to use it??!!  Otherwise I guess
I roll my own code (which is not hard - just trying to avoid it).

Am I missing something obvious?
Thanks!
Alan


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Bret Mcguire  
View profile  
 More options Aug 13 2007, 4:44 pm
From: Bret Mcguire <mersaul...@yahoo.com>
Date: Mon, 13 Aug 2007 13:44:37 -0700
Local: Mon, Aug 13 2007 4:44 pm
Subject: Re: Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool
   Have you tried one of the BlockingQueue implementations... maybe
LinkedBlockingQueue?

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/LinkedBl...

   put() and take() block under the expected conditions.

      - Bret -

On Aug 13, 2:34 am, Alan Kent <ALAN.J.K...@saic.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Alan Kent  
View profile  
 More options Aug 13 2007, 8:34 pm
From: Alan Kent <ALAN.J.K...@saic.com>
Date: Tue, 14 Aug 2007 10:34:16 +1000
Local: Mon, Aug 13 2007 8:34 pm
Subject: Re: [The Java Posse] Re: Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool

Bret Mcguire wrote:
>    Have you tried one of the BlockingQueue implementations... maybe
> LinkedBlockingQueue?

> http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/LinkedBl...

>    put() and take() block under the expected conditions.

Thanks, but the queue is used internally by the thread pool - you don't
get to chose what the thread pool class (ThreadPoolExecutor) calls.  The
thread pool calls the offer() method which is non-blocking.

I tried defining a subclass of ArrayBlockingQueue which made offer()
call put() to block, but the end result was only a single thread would
run - very strange.  It never created more than one thread.  As soon as
I put it back, multiple threads would run.  Looking in the
ArrayBlockingQueue source it looked fine, so I suspect the problem is in
the ThreadPoolExecutor code - it just is not designed that way.

It took me a while to realize that "blocking" in the
java.util.concurrent terminology meant "stop it from happening by
throwing an exception", not cause the caller to wait until resources
were free.

I can go off and write my own thread pool - I was just wondering first
if I had overlooked something obvious.

Thanks!
Alan


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Alexey  
View profile  
 More options Aug 14 2007, 1:02 pm
From: Alexey <inline_f...@yahoo.com>
Date: Tue, 14 Aug 2007 10:02:58 -0700
Local: Tues, Aug 14 2007 1:02 pm
Subject: Re: Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool
On Aug 13, 8:34 pm, Alan Kent <ALAN.J.K...@saic.com> wrote:

A long long time ago, I implemented something like what you need.  I
then isolated into its own library along with some other utility
classes I liked to reuse.  Here's the main bit of code.  If you'd like
the rest, I'd be happy to send you the whole thing.  You could easily
rip out the object pooling references and I think that should be
enough to let it run on its own.  I haven't used it in a long time,
but it did run properly in a project, so I have reasonably high
expectations that it should work.

package zinger.util;

import java.util.*;

import zinger.util.recycling.*;

public class ThreadLimiter
{
    public static class RunnableException extends RuntimeException
    {
        private final Exception runnableException;

        public RunnableException(final Exception runnableException)
        {
            this.runnableException = runnableException;
        }

        public Exception getException()
        {
            return this.runnableException;
        }
    }

    protected static final ObjectGenerator COUNTER_GENERATOR = new
ArrayGenerator(int.class, 1)
    {
        public boolean prepareObject(final Object obj, final Object
arg) throws IllegalArgumentException
        {
            try
            {
                ((int[])obj)[0] = 0;
                return true;
            }
            catch(final ClassCastException ex)
            {
                throw new IllegalArgumentException();
            }
        }
    };

    protected final int maxThreadCount;
    protected final long threadTimeout;
    protected final int maxThreadWaits;

    protected final Map threads = new HashMap();
    protected final ObjectRecycler counterRecycler;

    public ThreadLimiter(final int maxThreadCount, final long
threadTimeout, final int maxThreadWaits)
    {
        this.maxThreadCount = maxThreadCount;
        this.threadTimeout = threadTimeout;
        this.maxThreadWaits = maxThreadWaits;

        this.counterRecycler = new
CappedObjectRecycler(ThreadLimiter.COUNTER_GENERATOR,
this.maxThreadCount);
    }

    protected String getCategoryName()
    {
        return this.getClass().getName();
    }

    public final void run(final Runnable toRun) throws
TimeoutException, InterruptedException, Exception
    {
        final Thread thread = Thread.currentThread();
        int[] threadCounter;
        synchronized(this.threads)
        {
            threadCounter = (int[])this.threads.get(thread);
            if(threadCounter == null)
            {
                for(int i = 0; this.threads.size() >= maxThreadCount; +
+i)
                {
                    if(i >= this.maxThreadWaits)
                    {
                        throw new TimeoutException();
                    }
                    this.threads.wait(this.threadTimeout);
                }
                threadCounter =
(int[])this.counterRecycler.getObject();
                this.threads.put(thread, threadCounter);
            }
        }
        ++threadCounter[0];
        try
        {
            toRun.run();
        }
        catch(final ThreadLimiter.RunnableException ex)
        {
            throw ex.getException();
        }
        finally
        {
            if((--threadCounter[0]) <= 0)
            {
                synchronized(this.threads)
                {
                    this.threads.remove(thread);
                    this.threads.notify();
                }
            }
        }
    }

    public StringBuffer status(StringBuffer sb)
    {
        if(sb == null)
        {
            sb = new StringBuffer();
        }

        sb.append("maxThreadCount
\t").append(this.maxThreadCount).append('\n');

        sb.append("threadTimeout
\t").append(this.threadTimeout).append('\n');

        sb.append("maxThreadWaits
\t").append(this.maxThreadWaits).append('\n');

        sb.append("active threads:");
        synchronized(this.threads)
        {
            if(this.threads.isEmpty())
            {
                sb.append(" none\n");
            }
            else
            {
                sb.append('\n');
                Map.Entry entry;
                for(final Iterator iterator =
this.threads.entrySet().iterator(); iterator.hasNext();)
                {
                    entry = (Map.Entry)iterator.next();
                    sb.append(entry.getKey())
                      .append(" (")
                      .append(((int[])entry.getValue())[0])
                      .append(")\n");
                }
            }
        }

        return sb;
    }


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
elpablo  
View profile  
 More options Aug 17 2007, 9:02 am
From: elpablo <el.pablo.tho...@gmail.com>
Date: Fri, 17 Aug 2007 13:02:27 -0000
Local: Fri, Aug 17 2007 9:02 am
Subject: Re: Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool
Try wrapping the ThreadPoolExecutor with your own class that has a
submitTask method that acquires a permit from a semaphore before
submitting the task and then releases the semaphore when it is done.
Use the same value for the executor's pool size and the amount of
permits available from the semaphore and you have your own blocking
implementation.

You will probably need to wrap the task in another runnable inside the
submitTask method and call run yourself in order to have a run method
that can see the semaphore to release the permit.

Hope that makes sense...

Paul

On Aug 13, 8:34 am, Alan Kent <ALAN.J.K...@saic.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
John Wright  
View profile  
 More options Aug 22 2007, 3:09 am
From: John Wright <FortyRun...@gmail.com>
Date: Wed, 22 Aug 2007 00:09:12 -0700
Local: Wed, Aug 22 2007 3:09 am
Subject: Re: Help using java.util.concurrent.ThreadPoolExecutor for producer/consumer thread pool
If you haven't already read Brian Goetz's latest book about
concurrency I can definitely recommend it - it will help with your
understanding.

We do something similar to this with a LinkedBlockingQueue. We have a
number of threads (manually created at the moment) that add items to a
BlockingQueue. A separate thread takes items off the queue and
processes them.

The Java threaded pool routines are just a way of creating threads.
You are correct in thinking that its up to you to build a framework
for this - it won't be hard.

I agree with you that this sort of Producer/Consumer is so prevalent
that it should have been in the Java 5 libraries. Even Brian Goetz's
book doesn't really give you a complete solution for this.

There are a number of scenarios.

1. Multiple threads add items to a BlockingQueue and a single thread
takes off items
2. A Single thread adds items to a BlockinqQueue and multiple threads
take off items
3. A Single thread adds items to a BlockQueue and another thread takes
off items.

Arguably these are all variants on a core pattern but there are
implementation reasons why you might want to treat them separately.

We have done something with item 1 and it works very well indeed. We
used Generics to create a class to handle this, it also defines JMX
methods to enable the queue to be monitored. I can't post the code
though because I work for a corporate!

On Aug 13, 8:34 am, Alan Kent <ALAN.J.K...@saic.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
End of messages
« Back to Discussions « Newer topic     Older topic »

Create a group - Google Groups - Google Home - Terms of Service - Privacy Policy
©2009 Google