If i understand correctly, we had some sort of same issue in our application. We needed to have a shared expandable thread pool, but limit parallelism for each instance accessing this shared thread pool. We achieved this by decorating the shared thread pool and limiting access using a semaphore :
/**
* An {@link ExecutorService} which bounds the number of parallel
* tasks which can be executed simultaneously on the underlying {@link ExecutorService}.
*
* The main purpose is to share an {@link ExecutorService} with several producers but limit
* the number of tasks each producer can run at the same time.
*
* The maximum number of parallel tasks which can be executed is relying on the law of
* the minimum. The limiting factor is either the {@link BoundedExecutorService} or
* the underlying {@link ExecutorService}.
*
* @author jloisel
*
*/
final class BoundedExecutorService extends ForwardingExecutorService {
private final Semaphore maxParallelTasks;
/**
* @param delegate executor service to which tasks are delegated
* @param maxParallelTasks maximum number of simultaneous tasks to execute
* @throws NullPointerException when delegate executor is NULL
* @throws IllegalArgumentException when maxParallel tasks <= 0
*/
BoundedExecutorService(final ExecutorService delegate, final Semaphore maxParallelTasks) {
super(delegate);
this.maxParallelTasks = checkNotNull(maxParallelTasks, "NULL semaphore is not allowed");
}
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <tt>ExecutorService</tt> implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be accepted for execution,
* and swallowed {@link InterruptedException} when interrupted while waiting to
* acquire {@link Semaphore}.
* @throws NullPointerException if command is null
*/
@Override
public void execute(final Runnable command) {
try {
maxParallelTasks.acquire();
super.execute(new SemaphoreReleasingRunnable(command, maxParallelTasks));
} catch (final InterruptedException | RejectedExecutionException e) {
maxParallelTasks.release();
Throwables.propagateIfInstanceOf(e, RejectedExecutionException.class);
throw new RejectedExecutionException("Could not run command: " + command, e);
}
}
}
And the SemaphoreReleasingRunnable :
/**
* {@link Runnable} that executes an underlying {@link Runnable}
* in the same thread and releases a {@link Semaphore} permit at the
* end of its execution.
*
* @author jloisel
*
*/
final class SemaphoreReleasingRunnable implements Runnable {
private final Runnable delegate;
private final Semaphore semaphore;
/**
* @param delegate task to execute
* @param semaphore semaphore to release after task execution
* @throws NullPointerException if delegate or semaphore is NULL
*/
SemaphoreReleasingRunnable(final Runnable delegate, final Semaphore semaphore) {
super();
this.delegate = checkNotNull(delegate);
this.semaphore = checkNotNull(semaphore);
}
@Override
public void run() {
try {
delegate.run();
} finally {
semaphore.release();
}
}
}
Not sure it does fit your needs, but it seems like. It still uses only one queue, but parallelism is limited by the decorator. A new decorator is created for each instance needing its own parallelism limit.