Thread pool problem that is maybe solved in the Scala eco system?

90 views
Skip to first unread message

Haddock

unread,
Feb 16, 2015, 11:19:17 AM2/16/15
to scala...@googlegroups.com
Hello,

I looked a bit at Scala and realized that various Scala frameworks are quite advanced as what concurrency is concerned (like actors, Akka, futures, etc.). So I thought I post my question here, because the audience here might be able to ask my question better.

Now the problem: I have a thread pool of m threads. Let's say m were 10 and fix. Then there are n queues with the possibility of n becoming large (like 100'000 or more). Every queue holds tasks to be executed by those m threads. Now, very important, every queue must be worked off sequentially task by task. This is a requirement to make sure that tasks are executed in the order they were added to the queue. Otherwise the data could become inconsistent (same as, say, with JMS queues).

So the question is now how to make sure that the tasks in those n queues are processed by the available m threads in a way that no task added to the same queue can be executed "at the same time" by different threads.

I tried to solve this problem myself and figured out that it is quite demanding. At least, it gets difficult when you want good throughput (e.g. little lock contention and blocking of threads). Java ThreadPoolExecutor is nice, but you would have to add quite a bit of functionality that is not easy to develop. So the question is whether anyone knows of some framework or system in the Sccala world that would solve this problem for me.

Thanks in advance, Haddock




Oliver Ruebenacker

unread,
Feb 16, 2015, 11:33:39 AM2/16/15
to Haddock, scala-user

     Hello,

  The classical solution would be to have a lock for each queue which must be acquired to add tasks to the queue and to remove them from the queue (e.g. for execution). Since adding and removing are quick, lock contention is normally not an issue. You don't need to hold a lock for execution, you just hold the lock for removing the task from the queue, which guarantees that only one thread removes a task, and release it before you execute the task.

  Lock contention usually becomes an issue when other shared resources are involved that are required for task execution.

     Best, Oliver 

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oliver Ruebenacker
Solutions Architect at Altisource Labs
Be always grateful, but never satisfied.

Haddock

unread,
Feb 16, 2015, 11:48:19 AM2/16/15
to scala...@googlegroups.com, ffm...@web.de
Hi Oliver,

thanks for your suggestion. Problem is that the number of queues is much larger than the number of threads. So if a thread becomes available, it would have to iterate over a lot of queues to find one that is not empty and the queue lock is also not held by a different threads. So I ended up adding all tasks to a single queue with some value set to indicate which queue it belongs to.

Now some thread becomes available, takes another task from the central queue, tries to obtain the lock of the queue the task is associated with. If it gets the queue lock, you are fine. If you don't, you have to let the thread currently already executing a task of the same queue know that there is some more item to be processed next after it is finished processing the current one (and this could result in tasks of other queues running into starvation or being outlocked for a while). Now how to do this without going through a synchronized block? A synchronized block has really bad lock contention... I mean at the end of the day for a system running 24/7 one synchronized block more or less makes a real diference.

Regards, Oliver

Oliver Ruebenacker

unread,
Feb 16, 2015, 12:51:29 PM2/16/15
to Haddock, scala-user

     Hello,

  Wait a moment, do you require that all tasks of a queue are processed by the same thread?

     Best, Oliver


--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Viktor Klang

unread,
Feb 16, 2015, 2:08:13 PM2/16/15
to Haddock, scala-user
Hi Haddock,

Here's a solution to your problem:

Example in Scala:

scala> import Actor._
import Actor._

scala> implicit val e: java.util.concurrent.Executor = java.util.concurrent.Executors.newCachedThreadPool
e: java.util.concurrent.Executor = java.util.concurrent.ThreadPoolExecutor@65e98b1c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

scala> val a = Actor( self => { case r: Runnable => r.run(); Stay })
a: Actor.Address = Actor$$anon$1@2e1d27ba

scala> a ! new Runnable { def run = println("oh yeah") }
oh yeah

scala> a ! new Runnable { def run = println("oh yeah") }

scala> oh yeah


--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Haddock

unread,
Feb 17, 2015, 3:17:29 AM2/17/15
to scala...@googlegroups.com

  Wait a moment, do you require that all tasks of a queue are processed by the same thread?

No, this is not required. It is only important that tasks of the same queue are not executed concurrently, but one after the other. A thread needs to execute a task from beginning to completion. Then it can execute any arbitrary other task provided that always a single task of some specific queue is being executed at the same time.

@Viktor: Thanks for the links. I don't grasp it at first sight. I will take some time this week after work and have a closer look at it. Thanks anyway.

Haddock

unread,
Feb 17, 2015, 3:31:26 AM2/17/15
to scala...@googlegroups.com
@Vik: I knew that actors have to solve the same problem so some people that play with actors might have solved this already ;-).

Paul Keeble

unread,
Feb 17, 2015, 4:09:38 AM2/17/15
to scala-user
The tasks within any individual queue can't be run in parallel so this is your smallest usable work unit for concurrent execution. Either you pass each of the items of a queue into an actor (which will execute them in the order you give it to them by default) or you pass the queue and have it iterate them itself. Since actors are extremely cheap to make (you can have millions) its actually a very useful way to go about organising your problem, an actor per queue, a message per task on the queue is pretty simple to set up. You could just make the queue a message as well and that way you can make the pulling of the task the actors job (watch out for IO here). You can have a million actors and if there is nothing to do it costs you almost nothing to create one for an empty queue and if there is a queue with a lot of items then its going to take a while to run those tasks serially anyway. Sorting the queues by size descending might help get big jobs started faster but ultimately that depends how long it takes to iterate the queues as create a million actors for a million queues.

I personally would recommend the use of Akka above the now deprecated internal actor system, but the internal system will work just fine for your problem.

PK

--

Viktor Klang

unread,
Feb 17, 2015, 4:54:34 AM2/17/15
to Haddock, scala-user
:)

If you want the same type of behavior but as an Executor (or in this case ExecutionContext) instead of Actors you can do something like: https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Oliver Ruebenacker

unread,
Feb 17, 2015, 8:18:15 AM2/17/15
to Haddock, scala-user

     Hello,

  OK, sorry I missed that part. In that case, you would have a queue of queues. Each thread would check out a task queue from the queue of queues, then check out a task from the task queue, and when the thread decides to be done with the task queue, it returns the task queue to the queue of queues.

  Queues are very simple objects: you just need a checkin and a checkout method, both synchronized. If the queue is empty, the checkout method returns something like null or None.

  Akka largely solves queuing for you, but at a cost:

  - If you simply use the recommended untyped actors directly via ? and !, you completely lose type safety. If you sent a completely wrong message to the completely wrong actor, the compiler won't complain. At runtime, you typically get a dead letter, which is much harder to trace back than, for example, an exception. Actor systems are very good at surviving bugs, they just don't do what you want them to do.

  - You can also use Akka's typed actors, but for some reasons I don't understand (even after reading the explanation on the Akka website), they are not recommended.

  - You can also use untyped actors but only access them through wrappers that ensure type safety. You'd just have to write all those wrappers.

     Best, Oliver

--
You received this message because you are subscribed to the Google Groups "scala-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--

Haddock

unread,
Feb 27, 2015, 4:19:47 AM2/27/15
to scala...@googlegroups.com
Thanks for all the replies so far. I've come up with something of my own now. I just like to play with threads a bit and think about solutions. That's why ;-). All right, here now the "best" solution I have come up with so far. Any comments greatly appreciated.

The system defines channel objects of which the tasks added to them need to be executed sequentially. The channels are the building blocks that the user deals with. The following describes internals to implement sequential execution of several channels where channels are handled in parallel.

The following queues are defined:

Ready Queue (RQ): Contains all tasks that can be executed immediately by any thread in the thread pool.

Entry Queue (EQ): Contains all tasks the user wants to be executed as well as internal admin tasks. The EQ is a priority queue. Admin tasks have highest priority.

Channels Queues (CQ): For every channel there is an internal channel queue that is used to preserve the ordering of the tasks inside a channel, e.g. make sure task are executed sequentially in the order they were added to the channel (which inserts them into the EQ.

Scheduler: Dedicated thread that takes tasks from EQ. If the task is a user task it is added to the CQ of the channel the task was added to. If the head of the CQ equals the just inserted user task it is also added to the EQ (but remains in the CQ) so that it is executed as soon as the next thread of the thread pool becomes available.

If a user task has finished execution an internal task TaskFinished is added to RQ. When executed by the scheduler, the head is taken from the associated CQ. If the CQ is not empty after the take, the next task is polled (but not taken) from the CQ and added to the RQ. The TaskFinished tasks have higher priority than user tasks.

This approach contains in my opinion no logical errors. Note that EQ and RQ need to be synchronized. I prefer using TransferQueue from JDK8 which is very fast and where checking for it to be empty or not, polling the head item is also very fast. The CQs need not be synchronized as they are always accessed by the Scheduler only.

So far I'm quite happy with this solution. What makes me think is whether the Scheduler could turn into a bottleneck. If there are much more tasks in the EQ than it can handle the EQ might grow building up some backlog. Any opinions about that would be appreciated :-).

Thanks, Haddock
Reply all
Reply to author
Forward
0 new messages