How to handle slow actors and bulkheading

256 views
Skip to first unread message

benoit heinrich

unread,
Jun 27, 2016, 11:24:54 AM6/27/16
to Akka User List
Hi all,

I've been playing with akka a few years ago, and I'm just back to it, trying to implement some kind of internal queueing mechanism on top of a very big java8 application which perform a lot of message processing.


Background:

The current application is a message processing application which receive JMS messages (using AMQ) and that need to process them.
The application has a notion of a subscriber for a given JMS message type, and when a message is received, all the subscribers which are interested in that message type are executed one after the other.
The problem is that some of those subscribers require some kind of pessimistic locks on some resources, and that causes a lot of threads to just wait for the resource to be acquired before anything else happen.


Solution:

The change I've made (which uses actor system) is that when the JMS message is received, it sends the message to an actor (a MessageConsumerActor) based on the resource that needs to be locked, and then the actor sends the work to a child (a MessageExecutorActor) one at a time to lock the resource, and execute the actual subscriber code.
The MessageConsumerActor is then managing the queue of messages to be processed, and given work to the MessageExecutorActor one at a time, and the MessageExecutorActor is in charge to acquire the lock.
Because the MessageExecutorActor / MessageConsumerActor actor couple is unique for that specific locked resource, no other processes try to lock that resource, and then acquiring the lock is very quick as no other threads try to acquire that lock a the same time.  With this the messages get executed nicely from the queue, one after the other.

The class which receive the message from JMS, is something which unfortunately doesn't support asynchronous calls (it needs to fulfill an existing API).
The API consider that once the method returns, then the JMS message is marked as processed, and if it throws an exception, then the transaction which is around the JMS message delivery is failed.
In order to make sure that I don't lose messages, when a JMS message is received, I return only when I get confirmation that the message have been queued by the MessageConsumerActor.
Unfortunately, the only way I've found to achieve this is to use Await.result() method.


Problem:

All this works greatly until I've got too many messages to process, at that time I receive timeouts when calling the Await.result() from the MessageConsumerActor, even though that actor should be very fast to acknowledge the result.

I think the problem might be due to those subscribers which are in general quite slow (from a few seconds to sometime a minute) to execute.
When a message is received on the MessageExecutorActor, it then calls the subscriber call in the same thread, and so that thread is getting busy with some very complex computation for a few seconds.
When there is a lot of messages received, then I've got the feeling that all the threads get busy running into those subscribers, and if another JMS message is received during that time, then I get a timeout due to the MessageConsumerActor not replying quick enough.


What I tried:

I've been googling on this, and I've watched a few (very interesting) video including the http://boldradius.com/blog-post/U-jexSsAACwA_8nr/dos-and-donts-when-deploying-akka-in-production .
What I get from this is that already, I should never call the  Await.result() method, but then how can I get the feedback that my message has been queued properly from the JMS thread (thread which has a transaction context associated to it)?

Then the next thing I tried is to use a different dispatcher for the MessageExecutorActor, and another one for the MessageConsumerActor.
I was hoping this would just work by magic, but unfortunately it didn't ;)

The way I've used it is by adding two new dispatcher configuration in my application.conf and then referencing them in the props when creating my actors.
I could see using JVisualVM (and watching logs) that the threads that now performed my application were named by the name of the dispatchers I gave in the configuration.

I've tried lot of different configurations, but none worked, and I'm starting to desperate here.

Here is the last configuration I tried:

# Dispatcher used by the ActorSystemPublisher and MessageConsumerActor to allow messages to be queued
queuing-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
}
# Because a single message in general doesn't have more much subscribers, a value of 20 should be enough
throughput = 1
}

# Dispatcher used by the MessageExecutorActor to execute messages by the subscribers
executing-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
parallelism-min = 1
parallelism-max = 4
}
# Because a single locked resource could have lot of messages to be processed, and because processing a message
# could be time consuming, we want to allow as much fairness for each resource to be executed.
throughput = 1
}

The queuing-dispatcher is used by the MessageConsumerActor, and the executing-dispatcher is used by the MessageExecutorActor.

As you can see I'm trying to use fork-join-executor and thread-pool-executor combinations, and all the possible variations, but none worked.
I tried different parallelism, but none work neither.
I tried to force a single thread for the MessageExecutorActor using parallelism-max=1 but that didn't work neither :(

Could someone please let me know how this kind of issue is being solved?
Am I on the right track with the dispatcher configuration?
Do I need to use some kind of routers, and if so, how?


Thanks in advance for all the suggestions :)

Cheers,
/Benoit

Gavin Baumanis

unread,
Jun 27, 2016, 7:51:44 PM6/27/16
to Akka User List
I can't answer your akka questions, sorry...

And I realise it isn't Akka based...
But considering you are using Java already... you might like to consider
It has the features you require.

We use it extensively in health because of its HL7 Messaging capabilities, but it isn't a HL7 solution - as much as it is an integration / messaging / SOA platform.

-Gavin. 

Guido Medina

unread,
Jun 28, 2016, 3:58:37 AM6/28/16
to Akka User List
I try to understand the problem he is describing, but when someone suggest a solution it confuses me,
specially when the solution shows little knowledge of the framework being used.

His question is akka related, he wants to resolve his problem with Akka,

he doesn't need to use an ESB to resolve a problem that the key circle around three things:
  1. Queues (Which actors have)
  2. Thread pools (Dispatchers) for specific actions (Which Akka provides)
  3. Coordination (Which Akka provides with many other things)
The problem is not the problem that he is trying to resolve, it is the little knowledge of Akka most people have before trying to tackle a complex problem.
I myself came from Java too, 1 year ago I knew nothing about Akka but the first thing I read was the manual, then the solution to the system I had to develop from scratch and by myself came along.

It is just a matter of most people being lazy and thinking than a simply copy & paste will solve the problem, desperation is a bad thing ;-)
one week reading will be the best spent time, the rest will just come easily.

HTH,

Guido.

Viktor Klang

unread,
Jun 28, 2016, 4:26:43 AM6/28/16
to Akka User List

Hi Benoit,

I am not sure if I have understood you correctly, but do you mean that you need to process the message within a single method call (when the method returns the message has been processed)?

Given that constraint I do not really see the value Akka could provide, unless the workload to process the message is parallelizable. Is it?

--
Cheers,

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Guido Medina

unread,
Jun 28, 2016, 4:30:38 AM6/28/16
to Akka User List
There are community contributions that can poll messages from a JMS queue, then you could send these messages to a worker actor,
such actor then can forward that message to another set of worker running on a blocking dispatcher that do the job, then answer the processed message back to the original worker and
let such worker acknowledge the original JMS message (if you want to ack)

1st set of worker can be something between 1 to N where N can be for example amount of CPUs, which makes it a router.
2nd set of worker can be the same as 1st set but running a different dispatcher.

Later one today I will give you sample code of how to define separate dispatchers and create N instances of an actor and then create a router with it or ...
you can read the documentation, it is very easy to do all that, each worker can be created with a Creator (read in the docs as creators are good to pass constructor parameters)

If you still have doubts when I get home I'mm paste some sample code.

HTH,

Guido.

Guido Medina

unread,
Jun 28, 2016, 4:37:04 AM6/28/16
to Akka User List
You could also have 1 non-blocking worker (actor) per queue and 1 blocking worker per queue (actor),
Akka has a nice message ordering policy so 1 actor will only process one message at a time and order will be kept.

Even if you have millions of queues, such actors can still run with a dispatcher with few threads.

It might be difficult to understand why you don't need to lock anything but think again and read Akka message ordering, you will realize it is simpler than what you think.

HTH,

Guido.

benoit heinrich

unread,
Jun 28, 2016, 5:32:49 AM6/28/16
to Akka User List
Hi All,

Thanks already for looking at my question, I really appreciate it.

@Gavin, the idea is that I'd like to use akka for 2 reasons:
1. I think that using akka will be an enabler for us to build much more reactive application than what's being currently done (so we can expand akka usage in the future)
2. All components I use so far looks really simple, and beside that load issue that I'm having, it just work great.
So using an EBS isn't something we'd like to do, the idea is being able to switch to a full akka world at some point in time (I've had a dream...)

@Victor, yes all this work is parallelizable, and running those heavy computation methods on multiple threads / nodes is the final goal.

@Guido, it's true that I didn't provide much details about how the original application works, but it's a million lines of code type of app (a big monolith), so explaining it all would be just too long ;)
Said that, let me try to give a bit more background, and a few updates since I've posted.

The original application is running in a war under tomcat7.

It's using spring integration to receive the JMS messages, and the new solution I'm putting in place needs to be turned on and off based on some configuration, so we need to be able to switch from current behaviour to new behaviour and vice versa easily.
That's the reason we don't want to change all the integration with spring to receive the JMS message, but instead call the queueing mechanism from the JMS message listener class that already exist (which will detect which behaviour to run).
Note that we don't want to change all the JMS message is received as it's perceived as a too big of a change for the first version of the change.
In the future we will probably change that layer to have the JMS message somehow handled in an asynchronous way, but this is out of scope for now.
The JMS provider is ActiveMQ, and we're using other technologies like hazelcast, jboss jbpm.



On the other hand I've been playing more with different configuration settings, and maybe I was in fact using something wrong !

One thing I didn't say in my original post is that I'm using ClusterSharding to distribute the load across a cluster of nodes.
And when using ClusterSharding, I've had to use mongo DB for the persistence storage (our IT didn't give me a choice).

The current storage engine I'm using is scullxbones/akka-persistence-mongo, and based on the documentation I was trying to setup the dispatcher based on the example:
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-min = 10
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-factor = 10
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-max = 20

Now when I look at the thread usages using jvisualvm, I can see that the most active threads are the ones for the system-akka-contrib-persistence-dispatcher threadpool.
When I've removed those settings that were provided as an example from the doc above, then I stopped getting the timeouts.

I guess this could have been part of the problem.

I'm still getting kind of longer than planned time to queue my messages (considered that it just receives a message and does an insert into a mongo collection).
Some messages are getting queued in 1ms, while some others take about 2.5s to be queued (the average is 325ms).
But it's not easy to see why it takes that long, and where the time is spent (using jvisualvm gives some clue, but it's far from being perfect).

Another thing I've realised is that when I tried to use the thread-pool-executor configuration, I did use the wrong keys in the settings :facepalm:
I was using the keys from the fork-join-executor ... I know... it was late and I was probably just getting blind.

So all in all, I'm getting much better performance than yesterday already, but I'm still not at the level I'd like to be.
So somehow I'd like to prioritise work of the MessageConsumerActor above execution of the MessageExecutorActor.

And using different dispatchers, I was hoping (and still hope) that I'll be able to do it nicely.

I'll try a few more experiments, and send some updates.

Thanks a lot for all your messages, this give me great insights on possible next moves to improve my message handling.

Guido Medina

unread,
Jun 28, 2016, 7:08:57 AM6/28/16
to Akka User List
You could still have Spring JMS beans poll and forward to its corresponding actor (1 actor per queue), switch on/off would be to forward to Akka or to just use your Spring processor as you are doing atm.
if you need ack you can do it with 1 intermediary -and supervisor- actor doing the following which is the key to it:
  1. One actor per queue (this is key) with a Map<MessageId, Message> instance per actor to keep state of messages sent to the other (blocking and child actor) that processes the message.
  2. Once message is processed at the child actor, send a message back to original actor (its parent), update map and optionally ack back the JMS message.
  3. I'm guessing each main actor queue will have to constructed with the Spring JMS processor, again, that if you need ack.
That's it, remember 1 actor can be processing one message at a time (concurrently) and message ordering is guaranteed in Akka (read about it) hence eliminating the need of locking.
You could do more things like distribute each actor's queue in the cluster, you can create the processing actor as child of the main queue actor so its life-cycle is controlled by the queue actor.

WDTY?

Guido.

benoit heinrich

unread,
Jun 28, 2016, 7:38:03 AM6/28/16
to Akka User List
Right, I know that akka actors can process one message at a time and as message ordering guarantee (from one actor to another actor).
That is the reason why I chose akka in fact, as it can easily isolate for concurrency issues.

The locking that is required is nothing to do with akka itself, that locking is to preserve resources to be used from other parts of the applications that won't be using that new message processing layer.
The application is really big, and unfortunately, not everything goes through messages (not yet) and so, we still need this notion of locks.

The resource I'm talking about is like a customer dataset, so that we can be sure not more than one process modifies that customer at a time.
And that's why I've selected ClusterSharding because this maps very well with the need I have (the MessageConsumerActor is in fact a child in the ClusterSharding region).
And as you suggest, the MessageConsumerActor does have a supervisor (started by the ClusterSharding) and keeps an internal queue of messages to be processed (stored in mongo)
And the MessageExecutorActor is a child of the MessageConsumerActor which processes the message.

The only thing I guess that I'm missing is a way to asynchronously confirm to JMS that a message is processed (or failed).
Unfortunately that one last part can't be changed for the moment, this will cause lot of fear to other people in the team that I'm trying to sell akka to.

Also, I've been using akka on different projects (using ClusterSharding with great success), but those were only akka based, not mixed with servlet / tomcat / hazelcast / etc.
Those applications were working great.
Here I'm in a transition period where I want the project to evolve to akka based message processors, and I need to show to people that akka just work for this change so that we can build more and more directly on akka.

btw - now that I've found that I was using the wrong configuration keys (mixing fork-join and thread-pool config fields), then I can play with different configurations and experiment.
I've got a big test suite which allows me to reproduce the same scenario over and over (sending a few thousands messages), and the only thing I'm changing is dispatcher configurations to see the impact.

So far, the biggest bottleneck I've found is for the akka-contrib-persistence-dispatcher, I'm still trying to figure out why it spends so much time on it, but changing that configuration, I could bring the average of queueing a message from 2s to 300ms, so it's much better, but it's a shame that it doesn't always take 1 or 2ms like it does in so many cases ;)

One thing that I'll probably need to consider is to decrease the threadpools used by ActiveMQ and Halzelcast, because they do spawn a lot of threads (I've got an average of 570 threads running in the app) and most of those threads are spawned by ActiveMQ and Halzelcast.  Akka system threads only represent 10% of the threads spawned, so there might be some thread starvation happening due to those other layers.

If you know a way that I can tell to the dispatcher that it has top priority over any other thread in the application, that could be awesome, because then maybe I could just solve my issue by forcing that priority ;)

Thanks again, for all the help :)

Guido Medina

unread,
Jun 28, 2016, 7:51:03 AM6/28/16
to Akka User List
Well, you can still lock in your child actor that is running in a different thread pool, and its parent actor has a reference to the Spring JMS processor,
when you process the message at the child, tell the parent that you succeed or failed processing that message, maybe instead of acking you could push back to another JMS queue.
I don't mean ask, I mean to only send message forward without expecting a result, you could have a timer to check messages not answering within a period and react to them accordingly.

About bulk-heading you can use a different type of mailbox for these specific actors (both parent and child),
you can use JCtools chunked queue that grows in defined sizes to avoid GC, specially if you will have thousands of messages per queue, that should save some memory and speed.

And the contract for an Actor is usually multiple-producer, single-consumer so this should help a lot:

if you need help on how to define your own mail boxes, first read the section mail-boxes and then look at these examples:

HTH,

Guido.

benoit heinrich

unread,
Jun 28, 2016, 2:03:26 PM6/28/16
to Akka User List
Some more news about my experiments.

One thing I realised is that I've had akka.cluster.sharding.remember-entities = on in my ClusterSharding configuration.

After turning this to off, the average time to queue message went down to 38ms :)

So I'm not sure what is involved when remember-entities is turned to on, but this causes lot of delay for the messages to actually arrive to the destination actor.

It's true that I've got a few hundreds actors running in parallel, and lot of new actors being created, and older ones being passivated.

In my design I don't need to have remember-entities turned on because I've already have my mongo collection where I keep my queue of messages to be processed, and when the actor restarts, then it automatically load the messages from the queue (in case it crashed and it needs to reload work).

So I've found that quite interesting and thought it would be good to share :)

When remember-entities was turned on, then I've found that the best setting (for my case) was to use such dispatcher configuration:

akka-contrib-persistence-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 20
parallelism-max = 40
parallelism-factor = 40
}
throughput = 1
}

When turned-off using only a parallelism-min of 2 and a parallelism-max of 8 is enough, and increasing it just decreases the overall throughput of my application.

So remember-entities can be appealing, but it has a cost that shouldn't be taken too lightly ;)

btw - I'm using akka 2.4.7 just to make sure I'm on latest and greatest.

So far I'm very happy with my configuration, and I guess the next step will be to look at how to free the thread used by the JMS consumer.

Thanks all for the great advices.

benoit heinrich

unread,
Jun 28, 2016, 2:07:18 PM6/28/16
to Akka User List
@Guido, earlier you mentioned this:

There are community contributions that can poll messages from a JMS queue, then you could send these messages to a worker actor,
such actor then can forward that message to another set of worker running on a blocking dispatcher that do the job, then answer the processed message back to the original worker and
let such worker acknowledge the original JMS message (if you want to ack)

Do you have any link to such community contributions ?

Guido Medina

unread,
Jun 28, 2016, 4:55:51 PM6/28/16
to Akka User List
I haven't used any but here are some useful links:
About parallelism, keep it to a factor of CPUs, parallelism factor of 40 doesn't make sense, are you going to have CPUs * 40 threads? And besides, it will be capped by max-parallelism,
parallelism factor usually makes sense to be between 0.5 and 4, much more it will just slow down everything IMHO.

HTH,

Guido.

Gavin Baumanis

unread,
Jun 28, 2016, 5:51:30 PM6/28/16
to Guido Medina, Akka User List

Guido

It isn't lost on me that this is the Akka mailing list.  I too am here to learn about Akka. 

I take onboard everything you say. But it was only a suggestion of an alternative solution. 
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/4YYF6nUTgwg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Guido Medina

unread,
Jun 28, 2016, 5:53:46 PM6/28/16
to Akka User List, oxy...@gmail.com
Not to worry, everybody is happy now ;-)

benoit heinrich

unread,
Jun 29, 2016, 2:55:26 AM6/29/16
to Akka User List, oxy...@gmail.com
Thanks Guido, I'll try changing the parallelism-factor to a smaller value and see how it performs.

I'll also look at your links, I hope it'll be easy to plug in my existing architecture.

Thanks again for all your help.
Reply all
Reply to author
Forward
0 new messages