Multi queue executor service with configurable threadpool.

2,896 views
Skip to first unread message

derijc...@gmail.com

unread,
Jun 27, 2013, 4:40:12 PM6/27/13
to guava-...@googlegroups.com
Our project used many singlethread executor services, each with their own thread. The idea is that messages (callables) are passed between threads. This all works fine. However currently we are using more and more singlethread executor services, far more than the number of available cpu cores. This is inefficient as a lot of thread context switched (on OS level) are taking places. I would therefor like to limit the number of threads used.

The current most straightforward solution would be to simply reuse the same executor service where previously different instances would be used. This is all fine until you encounter a long running task, other messages in the queue that are unrelated to the long running task have to wait while other threads might sit idle. Another approach that one might think of is to not use a singlethread executor, but use multiple threads for the same queue. This is obviously bad as operations that are related to eachother (eg they expect to be executed in-order) might start racing against eachother.

The solution I propose:

A threadpool with a configurable amount of threads that can provide an unlimited amount of queues in the form of executor services. These executor services do not have a dedicated thread but instead use any of the available threads in the threadpool. With some smart coding it should be possible to avoid thread starvation if the number of queues is high. This mechanism ensures that all threads are busy at all time and each queue is processed in order with no thread context switches in the OS.

Benjamin Manes

unread,
Jun 27, 2013, 5:02:02 PM6/27/13
to derijc...@gmail.com, guava-...@googlegroups.com
Would HawtDispatch be adaptable to your needs?

--
--
guava-...@googlegroups.com
Project site: http://guava-libraries.googlecode.com
This group: http://groups.google.com/group/guava-discuss
 
This list is for general discussion.
To report an issue: http://code.google.com/p/guava-libraries/issues/entry
To get help: http://stackoverflow.com/questions/ask (use the tag "guava")
 
---
You received this message because you are subscribed to the Google Groups "guava-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to guava-discus...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Luke Sandberg

unread,
Jun 27, 2013, 6:13:00 PM6/27/13
to Benjamin Manes, derijc...@gmail.com, guava-...@googlegroups.com
If your tasks have ordering dependencies, then i would use facilities like Futures.addCallback (et. al.) to ensure that they are executed in the correct order rather than depending on the queueing configuration of the executor.

Erik De Rijcke

unread,
Jun 28, 2013, 3:36:41 AM6/28/13
to Luke Sandberg, Benjamin Manes, guava-...@googlegroups.com
How would a callback ensure the correct execution order of the callable? Afaik a callback is simply executed after the callable is done. It's the execution order of the callables themself that is also of importance, not only the results they calculate. The reason for this is that multiple callables of the same queue can manipulate the same data so their order of execution is important to ensure correct data state & avoid having to do a fair synchronize access to this data. As a result, our program, for example, does not use a single lock, wait or synchronize, yet all data state is consistent.

@Benjamin. I'll have a look at it.

Luke Sandberg

unread,
Jun 28, 2013, 11:28:08 AM6/28/13
to Erik De Rijcke, Benjamin Manes, guava-...@googlegroups.com
just schedule the next task from the callback, since the callback isn't fired until the task is done you can guarantee correct ordering.

Erik De Rijcke

unread,
Jun 30, 2013, 5:09:56 PM6/30/13
to Luke Sandberg, Benjamin Manes, guava-...@googlegroups.com
It seems java7 makes it very easy to do what I want. I made quick poc and a simple junit test. Seems to work just fine so far

Jerome Loisel

unread,
Jul 10, 2013, 11:57:25 AM7/10/13
to guava-...@googlegroups.com, Luke Sandberg, Benjamin Manes, derijc...@gmail.com
If i understand correctly, we had some sort of same issue in our application. We needed to have a shared expandable thread pool, but limit parallelism for each instance accessing this shared thread pool. We achieved this by decorating the shared thread pool and limiting access using a semaphore :

/**
 * An {@link ExecutorService} which bounds the number of parallel
 * tasks which can be executed simultaneously on the underlying {@link ExecutorService}.
 *
 * The main purpose is to share an {@link ExecutorService} with several producers but limit
 * the number of tasks each producer can run at the same time.
 *
 * The maximum number of parallel tasks which can be executed is relying on the law of
 * the minimum. The limiting factor is either the {@link BoundedExecutorService} or
 * the underlying {@link ExecutorService}.
 *
 * @author jloisel
 *
 */
final class BoundedExecutorService extends ForwardingExecutorService {
    private final Semaphore maxParallelTasks;
   
    /**
     * @param delegate executor service to which tasks are delegated
     * @param maxParallelTasks maximum number of simultaneous tasks to execute
     * @throws NullPointerException when delegate executor is NULL
     * @throws IllegalArgumentException when maxParallel tasks <= 0
     */
    BoundedExecutorService(final ExecutorService delegate, final Semaphore maxParallelTasks) {
        super(delegate);
        this.maxParallelTasks = checkNotNull(maxParallelTasks, "NULL semaphore is not allowed");
    }

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the <tt>ExecutorService</tt> implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be accepted for execution,
     *         and swallowed {@link InterruptedException} when interrupted while waiting to
     *         acquire {@link Semaphore}.
     * @throws NullPointerException if command is null
     */
    @Override
    public void execute(final Runnable command) {
        try {
            maxParallelTasks.acquire();
            super.execute(new SemaphoreReleasingRunnable(command, maxParallelTasks));
        } catch (final InterruptedException | RejectedExecutionException e) {
            maxParallelTasks.release();

            Throwables.propagateIfInstanceOf(e, RejectedExecutionException.class);
            throw new RejectedExecutionException("Could not run command: " + command, e);
        }
    }
}

And the SemaphoreReleasingRunnable :

/**
 * {@link Runnable} that executes an underlying {@link Runnable}
 * in the same thread and releases a {@link Semaphore} permit at the
 * end of its execution.
 *
 * @author jloisel
 *
 */
final class SemaphoreReleasingRunnable implements Runnable {
    private final Runnable delegate;
    private final Semaphore semaphore;
   
    /**
     * @param delegate task to execute
     * @param semaphore semaphore to release after task execution
     * @throws NullPointerException if delegate or semaphore is NULL
     */
    SemaphoreReleasingRunnable(final Runnable delegate, final Semaphore semaphore) {
        super();
        this.delegate = checkNotNull(delegate);
        this.semaphore = checkNotNull(semaphore);
    }
   
    @Override
    public void run() {
        try {
            delegate.run();
        } finally {
            semaphore.release();
        }
    }
}

Not sure it does fit your needs, but it seems like. It still uses only one queue, but parallelism is limited by the decorator. A new decorator is created for each instance needing its own parallelism limit.

Erik De Rijcke

unread,
Jul 11, 2013, 3:39:00 AM7/11/13
to Jerome Loisel, guava-...@googlegroups.com, Luke Sandberg, Benjamin Manes
That would potentially work if I set the semaphore to one and create a decorator for each "queue", however it would block calls to execute() on each "queue" until the previous task is completed (until the semaphor is acquired) so it would defeat the purpose of using an executor service.

Jerome Loisel

unread,
Jul 11, 2013, 3:53:17 AM7/11/13
to guava-...@googlegroups.com, Jerome Loisel, Luke Sandberg, Benjamin Manes, derijc...@gmail.com
It's just intended to limit the number of tasks simultaneously running on an underlying ExecutorService.

Queues are not really the best way to share datas between threads. They induce context switches due to locking when adding or removing elements. I know there is a high performance library for exchanging data between threads called LMAX Disruptor. It's been used in Storm (by Nathan Marz) :
http://lmax-exchange.github.io/disruptor/

But, you may first need to profile your application to find out if number of threads is really a performance hog. Thread pool sizing does not only depend on the number of available cores, it also depends on the ratio of blocking and non blocking operations the submitted tasks do.

Ncpu = number of cpus
Ucpu = target cpu utilization, 0 <= Ucpu <= 1
W / C = ratio of wait time vs compute time
The optimal sizing for keeping the cpu at desired utilization is :
Nthreads = Ncpu x Ucpu x (1 + W / C)

(Thanks to Brian Goëtz in "Java Concurrency in Practice")

alexei.k...@gmail.com

unread,
Sep 11, 2013, 2:41:35 AM9/11/13
to guava-...@googlegroups.com, derijc...@gmail.com

Look at https://github.com/rfqu/CodeSamples/blob/master/src/simpleactor/SerialExecutor.java - this class implements Executor but do not own threads, using another "real" executor to run tasks.

Erik De Rijcke

unread,
Sep 11, 2013, 3:56:44 AM9/11/13
to alexei.k...@gmail.com, guava-...@googlegroups.com
Your implementation does seem to have some concurrency issues at first glans. Say you use a threadpool executor underneath. One thread assigns the active task (in the sync block) another thread that just exited the sync block submits the executor (which is also a runnable). This causes the just updated active task to be run. Now the thread that updated the active task also calls a submit. Now your active task is running twice.

alexei.k...@gmail.com

unread,
Sep 11, 2013, 4:23:37 AM9/11/13
to guava-...@googlegroups.com, alexei.k...@gmail.com, derijc...@gmail.com
I cannot agree. If "another thread that just exited the sync block submits the executor", then variable active was just set by that thread, and another thread could not assign that variable, and would not activate the serial executor task twice. More formally, the system has 2 states: active==null and active!=null. Switch from first state to the second state is accompanied with activating the serial executor, and switching from 2 to 1 is accompanied with deactivating (return from SerialExecutor.run). As a result, an instance of SerialExecutor cannot be submitted more than once. This also allows to read variable active without synchronization in the SerialExecutor.run.

Erik De Rijcke

unread,
Sep 11, 2013, 4:32:48 AM9/11/13
to alexei.k...@gmail.com, guava-...@googlegroups.com
execute is called quickly twice, the threadpool executor will assign 2 different threads to execute run() of the same serial executor. same active object is now executed in parallel by 2 threads.

Erik De Rijcke

unread,
Sep 11, 2013, 4:33:35 AM9/11/13
to Alexei Kaigorodov, guava-...@googlegroups.com
Ah yes the return statement, that might make it work

mail...@gmail.com

unread,
Jun 12, 2015, 8:53:16 AM6/12/15
to guava-...@googlegroups.com
Hi
Would you mind sharing the code or what you used in Java 7? I've the same exact requirement ( thousands of queues serviced by few threads..)

michael...@gmail.com

unread,
Jun 11, 2018, 1:03:18 AM6/11/18
to guava-discuss
Hi,

I have a very similar requirement to what you originally voiced here.  The Java 7 example that you had previously published is no longer on github.  Would you be able to post it again? I'd like a basic implementation of multiple queues or multiple single threaded executors backed by a single thread pool, without having to bring in any additional libraries.

Regards,
Michael.

Benjamin Manes

unread,
Jun 11, 2018, 2:50:09 PM6/11/18
to michael...@gmail.com, guava-discuss
You can do this very easily using CompletableFuture's chaining and a weakValue cache. As in,

Cache<String, CompletableFuture<Void>> dispatchQueues = 
    CacheBuilder.newBuilder().weakValues().build();

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.asMap().compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

A strong reference to the last future in the chain is held by its predecessor until its run and by the executor while running. When the future is complete and has no successors, the GC can discard it and the cache will eventually prune the entry. The FIFO queue ordering is retained by your key and the atomic nature of the compute. The backing thread pool is by default FJP, but you can pass in a custom one as needed. You will want to wrap the runnable with some error handler, or else an exception would break the chain. Guava's or Caffeine's cache should work here, though be aware that Guava's compute requires newer versions and may not be fully robust yet. 


This group: http://groups.google.com/group/guava-discuss
 
This list is for general discussion.

---
You received this message because you are subscribed to the Google Groups "guava-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to guava-discuss+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/guava-discuss/70b1c961-0dae-45cd-a422-6a8fd31e9822%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

michael...@gmail.com

unread,
Jun 11, 2018, 5:43:47 PM6/11/18
to guava-discuss
Thanks Ben.  I have guava 19.0 already in my project, so I will give this approach a try using the guava Cache.

Thanks!
Michael.
To unsubscribe from this group and stop receiving emails from it, send an email to guava-discus...@googlegroups.com.

Benjamin Manes

unread,
Jun 11, 2018, 6:03:42 PM6/11/18
to michael...@gmail.com, guava-discuss
You will require v21 or greater for JDK8 support. Otherwise you will inherit the default, non-atomic Map methods which will not work as desired. That will instead cause multiple futures to be created when an retrying on a failed update (due to another thread winning the race). There are still some open items around the compute methods, though may have been fixed in the last release and not all duplicate tasks closed. So you should be a little careful and test accordingly.

Louis would know best the current state of these methods.

To unsubscribe from this group and stop receiving emails from it, send an email to guava-discuss+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/guava-discuss/c5617e9e-6c6c-46f2-8911-c39fe9e1ab83%40googlegroups.com.

michael...@gmail.com

unread,
Jun 11, 2018, 7:33:23 PM6/11/18
to guava-discuss
Ah, okay.  I also have caffeine 2.3.5 in my project via Infinispan.  Would the cache implementation there be suitable, or would I also need to upgrade to a newer version of caffeine?  We are probably long overdue for updating our dependencies anyway, but it would be good to know if it is worth doing some initial testing on the current stack without changing dependencies (I prefer not to make multiple changes in one go).  What are the current recommended versions of guava and caffeine for a production system?

Thanks again!
Michael.

Benjamin Manes

unread,
Jun 11, 2018, 7:48:37 PM6/11/18
to Michael Siemer, guava-discuss
Current recommendation is the latest, if possible, since those have bug fixes.

Caffeine's support should be good since it was written for JDK8 and has gone through more testing. Guava's has evolved towards that so it is more recent and, like all code, can take time to mature. There are still some minor issues causing me to not include its compute methods in my test suite due to subtle quirks (all documented on the issue tracker), so I can't offer a strong recommendation for that yet.

To unsubscribe from this group and stop receiving emails from it, send an email to guava-discuss+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/guava-discuss/cff46033-5f7d-4ba6-ae52-bf707fdf24fe%40googlegroups.com.

Michael Siemer

unread,
Jun 14, 2018, 5:49:51 PM6/14/18
to ben....@gmail.com, guava-...@googlegroups.com
Thanks Ben.  I have been tinkering with an implementation based on this, and it is working perfectly.  Do you know if any aspect of the CompletableFuture, weak map implementation rely on using a ForkJoinPool as the backing implementation, or would this work equally well with a ThreadPoolExecutor as the underlying Executor?

Thanks,
 Michael.

Benjamin Manes

unread,
Jun 14, 2018, 6:01:18 PM6/14/18
to Michael Siemer, guava-...@googlegroups.com
You should be more aware of the potential for a RejectedExecutionException being thrown, as FJP does so only after 64,000 tasks. A custom executor may reject or use a caller-runs policy. The impact of the latter is it would occur inside of the compute block and delay other operations. Otherwise I think it’s pretty safe.

Michael Siemer

unread,
Jun 14, 2018, 8:57:43 PM6/14/18
to ben....@gmail.com, guava-...@googlegroups.com
I've noticed that sometimes my calling thread seems to block for quite a while when submitting a new task to the queue:

at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
   Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#54
   Local Variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#55
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
   Local Variable: java.util.concurrent.locks.ReentrantLock$NonfairSync#1255
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at com.google.common.cache.LocalCache$Segment.compute(LocalCache.java:2194)
at com.google.common.cache.LocalCache.compute(LocalCache.java:4196)
   Local Variable: minestar.core.common.service.job.JobExecutionServiceImpl$$Lambda$437#1
at minestar.core.common.service.job.JobExecutionServiceImpl.dispatch(JobExecutionServiceImpl.java:320)

This is on the guava 25.1-jre version of the guava LocalCache.  Is this the sort of issue where you were suggesting that Caffeine might be a more robust option?

Benjamin Manes

unread,
Jun 14, 2018, 9:37:35 PM6/14/18
to Michael Siemer, guava-...@googlegroups.com
Yes, though that could be if the executor is using a caller-runs policy when exhausted. Guava has a default concurrencyLevel of 4 writers, so bumping that up might help. The submission should be fast so it’s hard to tell if it’s Guava or your implementation is at fault.
Reply all
Reply to author
Forward
0 new messages