Pulling Pattern vs Durable Mailboxes

1,374 views
Skip to first unread message

massivedynamic

unread,
May 5, 2014, 3:13:26 AM5/5/14
to akka...@googlegroups.com

This is a copy of a post I put up on Stackoverflow.

I've been working on a project of mine using Akka to create a real-time processing system which takes in the Twitter stream (for now) and uses actors to process said messages in various ways. I've been reading about similar architectures that others have built using Akka and this particular blog post caught my eye:

http://blog.goconspire.com/post/64901258135/akka-at-conspire-part-5-the-importance-of-pulling

Here they explain different issues that arise when pushing work (ie. messages) to actors vs. having the actors pull work. To paraphrase the article, by pushing messages there is no built-in way to know which units of work were received by which worker, nor can that be reliably tracked. In addition, if a worker suddenly receives a large number of messages where each message is quite large you might end up overwhelmed and the machine could run out of memory. Or, if the processing is CPU intensive you could render your node unresponsive due to CPU thrashing. Furthermore, if the jvm crashes, you will lose all the messages that the actor(s) had in its mailbox.

Pulling messages largely eliminates these problems. Since a specific actor must pull work from a coordinator, the coordinator always knows which unit of work each worker has; if a worker dies, the coordinator knows which unit of work to re-process. Messages also don’t sit in the workers’ mailboxes (since it's pulling a single message and processing it before pulling another one) so the loss of those mailboxes if the actor crashes isn't an issue. Furthermore, since each worker will only request more work once it completes its current task, there are no concerns about a worker receiving or starting more work than it can handle concurrently. Obviously there are also issues with this solution like what happens when the coordinator itself crashes but for now let's assume this is a non-issue. More about this pulling pattern can also be found at the "Let It Crash" website which the blog references:

http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2

This got me thinking about a possible alternative to doing this pulling pattern which is to do pushing but with durable mailboxes. An example I was thinking of was implementing a mailbox that used RabbitMQ (other data stores like Redis, MongoDB, Kafka, etc would also work here) and then having each router of actors (all of which would be used for the same purpose) share the same message queue (or the same DB/collection/etc...depending on the data store used). In other words each router would have its own queue in RabbitMQ serving as a mailbox. This way, if one of the routees goes down, those that are still up can simply keep retrieving from RabbitMQ without too much worry that the queue will overflow since they are no longer using typical in-memory mailboxes. Also since their mailbox isn't implemented in-memory, if a routee crashes, the most messages that it could lose would just be the single one it was processing before the crash. If the whole router goes down then you could expect RabbitMQ (or whatever data store is being used) to handle an increased load until the router is able to recover and start processing messages again.

In terms of durable mailboxes, it seems that back in version 2.0, Akka was gravitating towards supporting these more actively since they had implemented a few that could work with MongoDB, ZooKeeper, etc. However, it seems that for whatever reason they abandoned the idea at some point since the latest version (2.3.2 as of the writing of this post) deprecated them. You're still able to implement your own mailbox by implementing the MessageQueue interface which gives you methods like enqueue(), dequeue(), etc... so making one that works with RabbitMQ, MongoDB, Redis, etc wouldn't seem to be a problem.

Anyways, just wanted to get your guys' and gals' thoughts on this. Does this seem like a viable alternative to doing pulling?

Justin du coeur

unread,
May 5, 2014, 3:15:11 PM5/5/14
to akka...@googlegroups.com
On Mon, May 5, 2014 at 3:13 AM, massivedynamic <lu4...@gmail.com> wrote:

Anyways, just wanted to get your guys' and gals' thoughts on this. Does this seem like a viable alternative to doing pulling?

Definitely not my area of expertise -- I haven't gotten a system under sufficient load yet to really grok all the issues -- but I would worry about efficiency differences.  My impression has been that all implementations of durable mailboxes are quite heavyweight, and can't support anywhere near the throughput of ordinary mailboxes.  (Whereas the work-pulling pattern doesn't seem to have anywhere near so much overhead.)  Folks who have worked with durable mailboxes, does this match your experience?

Matthew Howard

unread,
May 5, 2014, 4:06:40 PM5/5/14
to akka...@googlegroups.com

This got me thinking about a possible alternative to doing this pulling pattern which is to do pushing but with durable mailboxes. An example I was thinking of was implementing a mailbox that used RabbitMQ (other data stores like Redis, MongoDB, Kafka, etc would also work here) and then having each router of actors (all of which would be used for the same purpose) share the same message queue (or the same DB/collection/etc...depending on the data store used). In other words each router would have its own queue in RabbitMQ serving as a mailbox. This way, if one of the routees goes down, those that are still up can simply keep retrieving from RabbitMQ without too much worry that the queue will overflow since they are no longer using typical in-memory mailboxes. Also since their mailbox isn't implemented in-memory, if a routee crashes, the most messages that it could lose would just be the single one it was processing before the crash. If the whole router goes down then you could expect RabbitMQ (or whatever data store is being used) to handle an increased load until the router is able to recover and start processing messages again.

I was considering almost the same thing, but talked myself out of it after reading this: (http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/ see section "Large Queues"). RabbitMQ queues messages in memory, regardless of whether they are durable. The durability only dictates that they are ALSO written to disk... but if you plan for the queue to become fairly large then performance can degrade considerably because at some point it needs to page to disk. I am looking at having large bursts of messages - up to 20 million, and so I felt like allowing the queue to grow that large would probably not work for our use-case. Other MQs might behave differently and I never tested this with RabbitMQ so that might still be a valid approach... I just wanted to minimize the risk and I have the luxury of having control over how the messages are produced. 

 

massivedynamic

unread,
May 5, 2014, 5:05:31 PM5/5/14
to akka...@googlegroups.com
I was considering almost the same thing, but talked myself out of it after reading this: (http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/ see section "Large Queues"). RabbitMQ queues messages in memory, regardless of whether they are durable. The durability only dictates that they are ALSO written to disk... but if you plan for the queue to become fairly large then performance can degrade considerably because at some point it needs to page to disk. I am looking at having large bursts of messages - up to 20 million, and so I felt like allowing the queue to grow that large would probably not work for our use-case. Other MQs might behave differently and I never tested this with RabbitMQ so that might still be a valid approach... I just wanted to minimize the risk and I have the luxury of having control over how the messages are produced. 

Thanks for the insight! Yeah you're right in that Rabbit's performance degrades the larger its queues become. In my case I don't plan on having the queues hold millions of messages so it's definitely a use-case that I still have to explore. I was also thinking of using something like Kafka (instead of RabbitMQ) which utilizes sequential disk access but I can't say I've looked into it in much depth. When you say you talked yourself out of it, what kind of design did you end up going with if you don't mind me asking? Also in terms of implementing mailboxes to act as wrappers to a data-store's (ie. RabbitMQ, Mongo, etc...) input/output, what's your take on how heavyweight it would make them?


Matthew Howard

unread,
May 6, 2014, 12:56:08 AM5/6/14
to akka...@googlegroups.com
Our design is still a bit young, but we're going with a work pulling pattern right now... which is something that has worked well for me in the past (this is my first Akka impl though). In terms of the overall approach of using a queue/datastore effectively as a replacement for a mailbox (or implementing your own mailbox backed by a data store) it seems like a valid design to me if you need durable messaging (or just need messaging across heterogeneous systems). It obviously adds the communication/serialization overhead to every message, but really any durable message is going to have the same overhead, so in terms of it being too heavyweight I don't think I'd worry unless very high throughput was a real priority. In that case though there are no great options if you also want the durability... but that is where distributed data stores or queues can help - if you can scale out your datastore and consumers then you can get throughput even on durable messages. 

I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use. I'd be interested to hear some Akka-oriented options from the crew out there. 

Martin Krasser

unread,
May 6, 2014, 2:06:16 AM5/6/14
to akka...@googlegroups.com

On 06.05.14 06:56, Matthew Howard wrote:
Our design is still a bit young, but we're going with a work pulling pattern right now... which is something that has worked well for me in the past (this is my first Akka impl though). In terms of the overall approach of using a queue/datastore effectively as a replacement for a mailbox (or implementing your own mailbox backed by a data store) it seems like a valid design to me if you need durable messaging (or just need messaging across heterogeneous systems). It obviously adds the communication/serialization overhead to every message, but really any durable message is going to have the same overhead, so in terms of it being too heavyweight I don't think I'd worry unless very high throughput was a real priority. In that case though there are no great options if you also want the durability... but that is where distributed data stores or queues can help - if you can scale out your datastore and consumers then you can get throughput even on durable messages. 

I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use.

You may be interested in this pull request that enables reading from akka-persistence journals via reactive-stream producers.

I'd be interested to hear some Akka-oriented options from the crew out there. 
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
Martin Krasser

blog:    http://krasserm.blogspot.com
code:    http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

massivedynamic

unread,
May 6, 2014, 2:29:33 AM5/6/14
to akka...@googlegroups.com
I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use. I'd be interested to hear some Akka-oriented options from the crew out there. 

I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like). Something like Persistence might work except for a (probably rare but still plausible) case where the majority of your Akka system goes down, including the mechanisms responsible for Persistence. It also doesn't help when you're working with a data-stream that never stops coming; there's no easy way to catch up when failures happen.

I've also been reading about Akka streams and I also checked out the Typesafe activator module that they have with examples. It definitely looks very promising. Unfortunately it seems like it's something that will be released closer to the end of the year. If only it could get here sooner!

Ryan Tanner

unread,
May 6, 2014, 11:38:19 AM5/6/14
to akka...@googlegroups.com
I'm the author of the blog post from Conspire you referenced.  In our case, losing the supervisor isn't a problem because all work is generated from a SQL database.  If the supervisor crashes, we can just start over.  Our worker nodes subscribe to cluster event notifications and will queue their messages to the supervisor if its node goes down so we also don't lose any in-progress work.  In addition, our workers are responsible for their own persistence, we only send instructions and status updates between nodes so there's little concern about losing work in-flight.

When we tried to take our first implementation of this system into production (using Akka but with lots of actor+future combos), it crashed *horribly*.  Nodes would get marked unreachable within a minute of starting the cluster because worker actors immediately spun their work out to a future and started on the next item, saturating our puny EC2 nodes which prevented Akka's internal cluster/remoting actors from ever getting any CPU time.  In our refactor we focused on building a very defensive system which can lose any given node without breaking down—a key to this was pulling work rather than pushing it (plus ditching the futures except when dealing with IO).

Since that refactor, we've quadrupled the load on our Akka cluster and had zero issues.  We can trivially add and remove workers if we can a burst of load and if a worker does crash, its work is re-dispatched to another work from its last known "save point" (not really a term we use but I don't want to get into the weeds on what's really a domain-specific concern).

Patrik Nordwall

unread,
May 6, 2014, 11:47:06 AM5/6/14
to akka...@googlegroups.com



6 maj 2014 kl. 17:38 skrev Ryan Tanner <ryan....@gmail.com>:

I'm the author of the blog post from Conspire you referenced.  In our case, losing the supervisor isn't a problem because all work is generated from a SQL database.  If the supervisor crashes, we can just start over.  Our worker nodes subscribe to cluster event notifications and will queue their messages to the supervisor if its node goes down so we also don't lose any in-progress work.  In addition, our workers are responsible for their own persistence, we only send instructions and status updates between nodes so there's little concern about losing work in-flight.

When we tried to take our first implementation of this system into production (using Akka but with lots of actor+future combos), it crashed *horribly*.  Nodes would get marked unreachable within a minute of starting the cluster because worker actors immediately spun their work out to a future and started on the next item, saturating our puny EC2 nodes which prevented Akka's internal cluster/remoting actors from ever getting any CPU time.  In our refactor we focused on building a very defensive system which can lose any given node without breaking down—a key to this was pulling work rather than pushing it (plus ditching the futures except when dealing with IO).

Since that refactor, we've quadrupled the load on our Akka cluster and had zero issues.  

Glad to hear that!!!
/Patrik


We can trivially add and remove workers if we can a burst of load and if a worker does crash, its work is re-dispatched to another work from its last known "save point" (not really a term we use but I don't want to get into the weeds on what's really a domain-specific concern).

On Tuesday, May 6, 2014 12:29:33 AM UTC-6, massivedynamic wrote:
I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use. I'd be interested to hear some Akka-oriented options from the crew out there. 

I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like). Something like Persistence might work except for a (probably rare but still plausible) case where the majority of your Akka system goes down, including the mechanisms responsible for Persistence. It also doesn't help when you're working with a data-stream that never stops coming; there's no easy way to catch up when failures happen.

I've also been reading about Akka streams and I also checked out the Typesafe activator module that they have with examples. It definitely looks very promising. Unfortunately it seems like it's something that will be released closer to the end of the year. If only it could get here sooner!

--

Matthew Howard

unread,
May 6, 2014, 3:13:15 PM5/6/14
to akka...@googlegroups.com, kras...@googlemail.com


On Tuesday, May 6, 2014 2:06:16 AM UTC-4, Martin Krasser wrote:
You may be interested in this pull request that enables reading from akka-persistence journals via reactive-stream producers.

Yea, actually that looks much like what I had in mind. 


On Tuesday, May 6, 2014 2:29:33 AM UTC-4, massivedynamic wrote:
I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like). 

I am in the same boat as Ryan mentions below - I'm enriching and processing data that already resides in a database, so if a failure occurs I can just reprocess from the start. That is generally the direction I was thinking with a pure Akka implementation based on persistence. If you had a (persistent) Processor(s) just accumulating Tweet events, then a View(s) could subscribe to that processor and emit the events to downstream workers for processing. In that case the View acts as the coordinator and the Processor is the durable mailbox effectively, if either goes down you have the ability to recreate it and effectively pick up where you left off. You'd need to play with the snapshots, replay and recovery a bit to get proper flow control while reading the journal... Based on a quick read of Martin's PR above I think that is where streams would be helpful (a replacement of the View in my scenario). A PersistentChannel might be an easier option now that I think about it... then your workers can confirm when done - providing you automatic replay for anything missed if coordinator/worker dies. 

I'm not sure in your case how you might protect yourself if the coordinator dies, although if that is likely there should be some way to minimize the job and state of the coordinator to minimize it's role. So for example if the coordinator is responsible for a) pulling data from the Twitter stream, and b) supervising workers to consume that data, and c) acting on the response from workers and maintaining some state... then perhaps that really is best done with 3 actors. I could see a) and c) possibly being done with actors behind a router to provide some fault-tolerance (in which case "a" couldn't really be a Processor, but I think it could use a PersistentChannel). 

In my case I had lots of niggling reasons not to introduce another architectural component - but I think some MQ/DB option is perfectly reasonable. On another note - Ryan, I have been reading your blog also... very helpful, thanks. 

Martin Krasser

unread,
May 7, 2014, 12:57:23 AM5/7/14
to akka...@googlegroups.com

On 06.05.14 21:13, Matthew Howard wrote:


On Tuesday, May 6, 2014 2:06:16 AM UTC-4, Martin Krasser wrote:
You may be interested in this pull request that enables reading from akka-persistence journals via reactive-stream producers.

Yea, actually that looks much like what I had in mind. 


On Tuesday, May 6, 2014 2:29:33 AM UTC-4, massivedynamic wrote:
I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like). 

I am in the same boat as Ryan mentions below - I'm enriching and processing data that already resides in a database, so if a failure occurs I can just reprocess from the start. That is generally the direction I was thinking with a pure Akka implementation based on persistence. If you had a (persistent) Processor(s) just accumulating Tweet events, then a View(s) could subscribe to that processor and emit the events to downstream workers for processing. In that case the View acts as the coordinator and the Processor is the durable mailbox effectively, if either goes down you have the ability to recreate it and effectively pick up where you left off. You'd need to play with the snapshots, replay and recovery a bit to get proper flow control while reading the journal... Based on a quick read of Martin's PR above I think that is where streams would be helpful (a replacement of the View in my scenario). A PersistentChannel might be an easier option now that I think about it... then your workers can confirm when done - providing you automatic replay for anything missed if coordinator/worker dies.

Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product.


I'm not sure in your case how you might protect yourself if the coordinator dies, although if that is likely there should be some way to minimize the job and state of the coordinator to minimize it's role. So for example if the coordinator is responsible for a) pulling data from the Twitter stream, and b) supervising workers to consume that data, and c) acting on the response from workers and maintaining some state... then perhaps that really is best done with 3 actors. I could see a) and c) possibly being done with actors behind a router to provide some fault-tolerance (in which case "a" couldn't really be a Processor, but I think it could use a PersistentChannel). 

In my case I had lots of niggling reasons not to introduce another architectural component - but I think some MQ/DB option is perfectly reasonable. On another note - Ryan, I have been reading your blog also... very helpful, thanks. 

Matthew Howard

unread,
May 7, 2014, 11:10:04 AM5/7/14
to akka...@googlegroups.com, kras...@googlemail.com
On Wednesday, May 7, 2014 12:57:23 AM UTC-4, Martin Krasser wrote:

Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product.


Thanks, good to know. Just out of curiosity, would you also have any performance concerns with using a Processor/View combination acting in place of a 3rd party MQ? We were initially debating the alternatives to durable mailboxes - the OP was considering implementing a mailbox backed by rabbitmq/kafka/mongo/etc... To me that sounded like a lot of work to get right, and the complexities probably overlap a good bit with akka persistence. I've since seen that akka persistence is the reason for the deprecation of durable mailboxes. So maybe my question should be phrased as follows... given a data pipeline built in Akka with a succession of actors consuming, enriching/processing, and emitting data:

a) If you need persistence at key stages, what factors would lend themselves to introducing a 3rd party product rather than rely on akka persistence? Maybe queue size, throughput, complex distributed pub/sub or routing requirements... 

b) If a 3rd party product wasn't needed, what might be a recommended way to implement a durable work queue (with some sort of flow control)? I sort of threw out that suggestion of a Processor simply receiving/persisting work events, with a View consuming the journal (with some controlled replay) and forwarding to workers. But I could see some potential issues there (latency of the view, managing flow control in the view, not sure it is a proper use of a processor - does nothing really other than receive events). 

Matthew Howard

unread,
May 7, 2014, 11:50:20 AM5/7/14
to akka...@googlegroups.com
Apologies for the double-post. The below thread re: throughput and performance benchmarks seems relevant... including for my future self:


Chris Toomey

unread,
May 7, 2014, 10:40:34 PM5/7/14
to akka...@googlegroups.com
Great blog posts and great discussion here.

Ryan, your blog post has some kind of CSS problem that's causing the code snippets to render very poorly and practically illegibly for the bigger ones -- it's the same in Chrome, Firefox, and IE so not a browser issue.  Would be great if you could correct that to make post fully readable again :-).

Chris

Ryan Tanner

unread,
May 7, 2014, 10:54:35 PM5/7/14
to akka...@googlegroups.com
Woah, that is weird!  Thanks for the heads-up, I'll look into it.

massivedynamic

unread,
May 7, 2014, 11:44:46 PM5/7/14
to akka...@googlegroups.com
I'm the author of the blog post from Conspire you referenced.  In our case, losing the supervisor isn't a problem because all work is generated from a SQL database.  If the supervisor crashes, we can just start over.  Our worker nodes subscribe to cluster event notifications and will queue their messages to the supervisor if its node goes down so we also don't lose any in-progress work.  In addition, our workers are responsible for their own persistence, we only send instructions and status updates between nodes so there's little concern about losing work in-flight.

Wow, awesome, the author of the blog post commenting on the thread! Thanks Ryan! Got a couple of questions about what you said as well as from what I read in your blog. 

1. First just to make sure I understood what you guys ended up doing, whenever your supervisor crashes, an event is sent to some sort of event bus to which workers your are subscribed to which let's them know that their supervisor is down. When this happens, each worker that was pulling from the supervisor will send the message they were processing back to the supervisor (at least I think this is what you meant when you said "...will queue their messages to the supervisor") and will then go into some sort of waiting state until they hear back from the supervisor. Once the supervisor confirms to its workers that it's back up, processing continues as normal. Did I get this right?

2. From the code samples, what is the distinction between a Leader and Node?

3. In your blog (on the last section which I linked above), you mention how the Leader actor both creates a queue and spawns workers that do the pulling from said queue. If the leader were to go down, don't it's children (a.k.a. the worker nodes) also go down with it? If so, how do you prevent message loss? 

4. Is it still possible to do dynamic re-sizing of a worker router since a worker is only pulling a message at a time and thus its mailbox never really fills up? From what I understand, re-sizing uses an actor's mailbox to see how full it is so I'm wondering how this would be handled.

I am in the same boat as Ryan mentions below - I'm enriching and processing data that already resides in a database, so if a failure occurs I can just reprocess from the start. That is generally the direction I was thinking with a pure Akka implementation based on persistence. If you had a (persistent) Processor(s) just accumulating Tweet events, then a View(s) could subscribe to that processor and emit the events to downstream workers for processing. In that case the View acts as the coordinator and the Processor is the durable mailbox effectively, if either goes down you have the ability to recreate it and effectively pick up where you left off. You'd need to play with the snapshots, replay and recovery a bit to get proper flow control while reading the journal... Based on a quick read of Martin's PR above I think that is where streams would be helpful (a replacement of the View in my scenario). A PersistentChannel might be an easier option now that I think about it... then your workers can confirm when done - providing you automatic replay for anything missed if coordinator/worker dies. 

That is an interesting idea. Will definitely need to check and see how I can use Persistence to my advantage.

Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product. 

Yes, I think you're right in that regardless of what solution I come up with using Akka, I might still need to incorporate a 3rd party messaging store of some sort. Thanks for the tip!

Martin Krasser

unread,
May 8, 2014, 2:03:35 AM5/8/14
to akka...@googlegroups.com

On 07.05.14 17:10, Matthew Howard wrote:
On Wednesday, May 7, 2014 12:57:23 AM UTC-4, Martin Krasser wrote:

Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product.


Thanks, good to know. Just out of curiosity, would you also have any performance concerns with using a Processor/View combination acting in place of a 3rd party MQ?

A few performance numbers first. A simple performance test on my laptop (2013 MBP with SSD) gives:

- 120k msgs/sec write throughput for a Processor with LevelDB journal (with fsync=true)
- 70k msgs/sec write throughput for a Processor with Cassandra journal (single node Cassandra)
- 20k msgs/sec throughput of a PersistentChannel with LevelDB journal (with fsync=true)

An optimized implementation of a PersistentChannel should actually give a throughput comparable to the write throughput of a Processor. There are several reasons for the decreased throughput in the current implementation, one is interleaving read and write ops on the same journal (actor). I didn't measure the read throughput of a View yet, but if it is deployed on another node than its corresponding processor (and assuming a sufficiently high message replication by the journal) I'd expect a significantly higher throughput of distributed processor/view combinations. But here is also enough room for optimization. Currently, views are pull-based. Later optimizations could additionally support push-based delivery of messages to views, making the write throughput of processors the only bottleneck.

I hope that gives an idea what is possible with the current implementation (using LevelDB or Cassandra journal, didn't measure with others) and what one can expect from future optimizations. At the moment, akka-persistence is optimized for write-throughput, as reads are only made during processor recovery (except for PersistentChannel). Furthermore, it is optimized for cases where the whole message history is kept, without frequent deletions, such as when a message is delivered (by a PersistentChannel) or at regular intervals (by an application).


We were initially debating the alternatives to durable mailboxes - the OP was considering implementing a mailbox backed by rabbitmq/kafka/mongo/etc... To me that sounded like a lot of work to get right, and the complexities probably overlap a good bit with akka persistence. I've since seen that akka persistence is the reason for the deprecation of durable mailboxes. So maybe my question should be phrased as follows... given a data pipeline built in Akka with a succession of actors consuming, enriching/processing, and emitting data:

a) If you need persistence at key stages, what factors would lend themselves to introducing a 3rd party product rather than rely on akka persistence? Maybe queue size, throughput, complex distributed pub/sub or routing requirements...

If you need durability only for reliable delivery of messages (i.e. if you can throw away these messages from storage once delivered and processed), use a 3rd party messaging product. The main use case for akka-persistence is making stateful actors durable (at very high transaction rates).



b) If a 3rd party product wasn't needed, what might be a recommended way to implement a durable work queue (with some sort of flow control)?

See above.

Hope that helps.

Cheers,
Martin


I sort of threw out that suggestion of a Processor simply receiving/persisting work events, with a View consuming the journal (with some controlled replay) and forwarding to workers. But I could see some potential issues there (latency of the view, managing flow control in the view, not sure it is a proper use of a processor - does nothing really other than receive events). 


Matthew Howard

unread,
May 8, 2014, 11:05:51 PM5/8/14
to akka...@googlegroups.com
Very helpful, thanks Martin.

Ryan Tanner

unread,
May 9, 2014, 2:03:17 PM5/9/14
to akka...@googlegroups.com
Hi Luis,

1. First just to make sure I understood what you guys ended up doing, whenever your supervisor crashes, an event is sent to some sort of event bus to which workers your are subscribed to which let's them know that their supervisor is down. When this happens, each worker that was pulling from the supervisor will send the message they were processing back to the supervisor (at least I think this is what you meant when you said "...will queue their messages to the supervisor") and will then go into some sort of waiting state until they hear back from the supervisor. Once the supervisor confirms to its workers that it's back up, processing continues as normal. Did I get this right?

You've got it right.  The event bus in this case is just Akka's built-in cluster member event notifications.

Somewhere in Akka-land (I forget where we borrowed the concept, it might be in the docs somewhere), is the idea of using a Facade between nodes in a cluster.  The facade subscribes to cluster event notifications and uses that to keep track of where our supervisor is at any given time.  By convention, we have one node whose role is "supervisor".  The facade keeps track of all nodes and simply considers the oldest node with that role to be the supervisor (there should only be one at a time though).  If that node goes down and the facade receives the notification for that, it will drop that node from its internal register (really just a SortedSet[Member]) and then queue any messages to the supervisor internally.  Once another node joins the cluster with the "supervisor" role, each Facade will receive that event and flush the queue, sending those messages on to the new supervisor.

Every node has a Facade running on it and all cross-cluster communication is routed through this actor.

This isn't terribly robust—obviously there's a delay between a node actually going down and the facade getting that notification, plus we aren't persisting that queue, it's just a member variable of the Facade actor.  I'll get into why that doesn't bother us in a bit.

2. From the code samples, what is the distinction between a Leader and Node?

Leaders run on the supervisor, Nodes run on, well, the nodes.  So on the supervisor, the SupervisorActor creates an AnalyticsLeader locally as its child.  When the leader starts up, it creates a cluster-aware router for it's respective Node type which runs on all nodes with that role.  So the SupervisorActor creates an AnalyticsLeader.  The AnalyticsLeader in turn creates a router of AnalyticsNodes based on our config settings, with AnalyticsNodes running on all cluster members with the "analytics" role.

Nodes only do one piece of work at a time.  They just "context.become" between idle and working states, delegating work off to a Processor (e.g., AnalyticsProcessor) which actually handles whatever business logic this role requires.  Most of our processors are big, hair FSMs at this point—that's where everything happens.  So if we want a node to concurrently handle analytics processing, we tweak the cluster-aware routing config to allow more instances per node.

Leaders and Nodes are effectively the same code across role types, AnalyticsNode is literally just:

class AnalyticsNode extends Node[AnalyticsProcessor, AnalyticsMessage]("analytics") 

Node is generic on its processor type and the super-type of messages (e.g., StartAnalytics extends AnalyticsMessage with whatever instructions are needed for that unit of work).  The string passed to Node's constructor just designates the cluster role.

3. In your blog (on the last section which I linked above), you mention how the Leader actor both creates a queue and spawns workers that do the pulling from said queue. If the leader were to go down, don't it's children (a.k.a. the worker nodes) also go down with it? If so, how do you prevent message loss? 

Messages to and from leaders are subject to the same sort of queuing via the Facade actors as messages to and from the supervisor.  There's still the potential for some message loss but that's not a big deal for us. All our work is generated based on a SQL database.  For instance, "user 42 hasn't been processed since last Tuesday, process now" or "user 54 added a new account, re-process".  We just poll our database every 30 seconds or so.  Incredibly basic but it works just fine.  

We have an actor called a Pipeline which is responsible for determining what work needs to be done when.  It's also a child of the supervisor.  When it finds a piece of work to do, it spawns a PipelineWorker for that piece of work which marshalls it through all the stages of our pipeline.  We use the FSM helper for this.  The pipeline also won't dispatch new work if it knows that more than some threshold of work is already in progress, just based on the number of PipelineWorkers in play.  This helps keep load under control.

When a sub-task is dispatched out into the cluster (e.g., a StartAnalytics message), it moves into a state where the next expected message is AcknowledgeAnalytics, which means that the AnalyticsLeader found an idle AnalyticsNode which has accepted and begun whatever work designated by that StartAnalytics message.  We use the state timeout feature here so that if the acknowledgement doesn't arrive within a certain amount of time, an error is raised and the PipelineWorker tries again.  Our timeouts here are pretty lenient, something like 30 minutes.

4. Is it still possible to do dynamic re-sizing of a worker router since a worker is only pulling a message at a time and thus its mailbox never really fills up? From what I understand, re-sizing uses an actor's mailbox to see how full it is so I'm wondering how this would be handled.

We don't currently resize the number of workers dynamically.  If we need to add capacity, we generally just add nodes. 

Luis Medina

unread,
May 10, 2014, 4:02:24 AM5/10/14
to akka...@googlegroups.com
Hi Ryan,

Thank you so much for the detailed explanation. It really helped clarify some things. Just got a couple more questions:

1. This is more of a clarification about facades. So as you mentioned above, facades subscribe themselves to cluster event notifications and when they receive (I'm guessing) a "Terminated" notification from the supervisor going down, then each facade in a node acts as a temporary queue which receives messages that that node would have sent to the supervisor if it wasn't down. Once the supervisor comes back up, an event is sent to all of the facades telling them that they can now pass the messages in their queue to the supervisor.
a. Follow-up question: What if the supervisor takes a while to come back up and as a result, the messages that the facades were storing in their temporary queues become irrelevant? Does the supervisor have a way to deal with "irrelevant" messages once it's brought back up or can this situation never occur?

2. Did you guys consider using facades (or some other technique) to promote a different node to be a supervisor in the case where the original one goes down instead of having to wait for a new supervisor to join the cluster? If so, what was your experience with that?

2. Any ideas of how you could incorporate dynamic re-sizing in your cluster-aware routers so that you don't have to be monitoring (or have some 3rd party tool monitoring) the system to know when to add/remove nodes or is this a limitation of the pulling pattern?

Ryan Tanner

unread,
May 10, 2014, 12:34:04 PM5/10/14
to akka...@googlegroups.com
Glad to help!

You're understanding of (1) is mostly correct, but the facades don't subscribe to DeathWatch on the supervisor actors, just cluster event notifications.

1a: All work messages (start, acknowledge, complete, error) are tagged with a UUID for that chunk of work and that's used to route the message from the supervisor to the correct pipeline worker.  It's not uncomment to receive duplicate acknowledgements if a worker node crashes or there's a network hiccup that causes it to get kicked out of the cluster.  In that event, the leader for that node type (e.g., AnalyticsLeader) will receive a DeathWatch notification for that instance of AnalyticsNode and re-queue its work.  Once another worker comes available, a duplicate acknowledgement will make its way to the appropriate pipeline worker which just logs a warning and moves on.

AWS EC2 is a noisy, noisy place and this happens fairly regularly so we've built our system to deal with it.  We also use Ubuntu's upstart to monitor the JVM process on each box and restart it (on a different port) if it crashes.  In addition, if a node gets kicked out of the cluster and realizes it's the only member for a certain amount of time, it kills itself, allowing upstart to start a new instance which can join the cluster.

2. Our tasks are incredibly CPU bound and tend to fully saturate whatever CPU time is available.  As a result, we don't want the supervisor to be on the same box since we've had issues with our tasks interfering with Akka's internal remoting and cluster actors, leading to a very unstable cluster.  Even with those actors on their own dispatcher, it's been a problem.  EC2, in our experience, does a *terrible* job of context switching (we've measured this) so separate dispatchers aren't a silver bullet for this problem.  Note that we could probably solve it if we moved to larger boxes, in fact we recently switched from m1.large to m3.large and it seems to have improved but I haven't measured the new boxes' context switching performance.

In other words, we don't want our worker actors to crowd out the supervisor or cause the supervisor's node to die repeatedly.

3. We're happy with our setup.  The supervisor raises PagerDuty alerts if the cluster can't repair itself and we've got New Relic server monitoring (but not app monitoring) on every box which alerts us if we start running out of resources.  Plus, adding more worker actors on the same number of nodes doesn't improve overall throughput for us because, as I mentioned earlier, we're incredibly CPU bound (well, that's not completely true.  Some of our tasks are IO-bound but all involve serializing lots of data into JSON—a decision I hope we don't come to regret).

Adding nodes is so simple for us that it hasn't been something to worry about.  In fact, since we first went live last fall, we've barely touched any of our cluster functionality, it's just been running.  There have been a few hiccups but nothing that's caused us to rethink anything.  The rethinking all happened after our *first* attempt to go live when everything crashed and burned and I didn't sleep for three days.

We're also incredibly lucky with our use case: none of this is user-facing!  If it takes an extra few minutes for a user to get processed, they'll never know the difference.  We process users once a week (mostly) so if it takes an extra hour or so for a user to get processed, no big deal.  If it ever becomes an issue, we'll probably try to hook into AWS's auto-scaling API so that the cluster can add and remove nodes on its own without our involvement.  For the moment, it's not necessary.  

We use Chef to provision our nodes and though it's a pain to set up for a complicated system, that plus Vagrant lets us provision and add a new node to the cluster in about five minutes and most of that time is just waiting around for EC2 to provision the box and Chef to copy files around.  All we have to do is edit the Vagrantfile to add the new node and its roles and then our tooling does all the work for us.  Since our cluster is very resilient to failure at this point, we can safely remove a node by just terminating it immediately in the EC2 console and reverting our Vagrantfile.

Ryan Tanner

unread,
May 10, 2014, 3:57:27 PM5/10/14
to akka...@googlegroups.com
I should make it clear: neither the facade or the work pulling pattern are our creation—both came from the Akka docs or the letiticrash.com blog.  We've tweaked it a bit as we've run into problems but I don't want anyone to think I'm trying to take credit for those ideas.

Roland Kuhn

unread,
May 10, 2014, 4:08:23 PM5/10/14
to akka-user
Hi Ryan,

10 maj 2014 kl. 21:57 skrev Ryan Tanner <ryan....@gmail.com>:

I should make it clear: neither the facade or the work pulling pattern are our creation—both came from the Akka docs or the letiticrash.com blog.  We've tweaked it a bit as we've run into problems but I don't want anyone to think I'm trying to take credit for those ideas.

That is completely fine: you can still take credit for spending a lot of time writing up these patterns and explanations and helping others this way :-) Thanks a lot for that!

Regards,

Roland

PS: I just love this community!

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Luis Medina

unread,
May 10, 2014, 4:17:31 PM5/10/14
to akka...@googlegroups.com
Thank you again Ryan for answering my questions and thank you to everyone else who joined in the conversation. I think the discussion here has made me re-consider using the pulling pattern since it fits my use case pretty well. And yes, I agree Roland, this community is awesome!
Reply all
Reply to author
Forward
0 new messages