This is a copy of a post I put up on Stackoverflow.
I've been working on a project of mine using Akka to create a real-time processing system which takes in the Twitter stream (for now) and uses actors to process said messages in various ways. I've been reading about similar architectures that others have built using Akka and this particular blog post caught my eye:
http://blog.goconspire.com/post/64901258135/akka-at-conspire-part-5-the-importance-of-pulling
Here they explain different issues that arise when pushing work (ie. messages) to actors vs. having the actors pull work. To paraphrase the article, by pushing messages there is no built-in way to know which units of work were received by which worker, nor can that be reliably tracked. In addition, if a worker suddenly receives a large number of messages where each message is quite large you might end up overwhelmed and the machine could run out of memory. Or, if the processing is CPU intensive you could render your node unresponsive due to CPU thrashing. Furthermore, if the jvm crashes, you will lose all the messages that the actor(s) had in its mailbox.
Pulling messages largely eliminates these problems. Since a specific actor must pull work from a coordinator, the coordinator always knows which unit of work each worker has; if a worker dies, the coordinator knows which unit of work to re-process. Messages also don’t sit in the workers’ mailboxes (since it's pulling a single message and processing it before pulling another one) so the loss of those mailboxes if the actor crashes isn't an issue. Furthermore, since each worker will only request more work once it completes its current task, there are no concerns about a worker receiving or starting more work than it can handle concurrently. Obviously there are also issues with this solution like what happens when the coordinator itself crashes but for now let's assume this is a non-issue. More about this pulling pattern can also be found at the "Let It Crash" website which the blog references:
http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2
This got me thinking about a possible alternative to doing this pulling pattern which is to do pushing but with durable mailboxes. An example I was thinking of was implementing a mailbox that used RabbitMQ (other data stores like Redis, MongoDB, Kafka, etc would also work here) and then having each router of actors (all of which would be used for the same purpose) share the same message queue (or the same DB/collection/etc...depending on the data store used). In other words each router would have its own queue in RabbitMQ serving as a mailbox. This way, if one of the routees goes down, those that are still up can simply keep retrieving from RabbitMQ without too much worry that the queue will overflow since they are no longer using typical in-memory mailboxes. Also since their mailbox isn't implemented in-memory, if a routee crashes, the most messages that it could lose would just be the single one it was processing before the crash. If the whole router goes down then you could expect RabbitMQ (or whatever data store is being used) to handle an increased load until the router is able to recover and start processing messages again.
In terms of durable mailboxes, it seems that back in version 2.0, Akka was gravitating towards supporting these more actively since they had implemented a few that could work with MongoDB, ZooKeeper, etc. However, it seems that for whatever reason they abandoned the idea at some point since the latest version (2.3.2 as of the writing of this post) deprecated them. You're still able to implement your own mailbox by implementing the MessageQueue interface which gives you methods like enqueue(), dequeue(), etc... so making one that works with RabbitMQ, MongoDB, Redis, etc wouldn't seem to be a problem.
Anyways, just wanted to get your guys' and gals' thoughts on this. Does this seem like a viable alternative to doing pulling?
Anyways, just wanted to get your guys' and gals' thoughts on this. Does this seem like a viable alternative to doing pulling?
This got me thinking about a possible alternative to doing this pulling pattern which is to do pushing but with durable mailboxes. An example I was thinking of was implementing a mailbox that used RabbitMQ (other data stores like Redis, MongoDB, Kafka, etc would also work here) and then having each router of actors (all of which would be used for the same purpose) share the same message queue (or the same DB/collection/etc...depending on the data store used). In other words each router would have its own queue in RabbitMQ serving as a mailbox. This way, if one of the routees goes down, those that are still up can simply keep retrieving from RabbitMQ without too much worry that the queue will overflow since they are no longer using typical in-memory mailboxes. Also since their mailbox isn't implemented in-memory, if a routee crashes, the most messages that it could lose would just be the single one it was processing before the crash. If the whole router goes down then you could expect RabbitMQ (or whatever data store is being used) to handle an increased load until the router is able to recover and start processing messages again.
I was considering almost the same thing, but talked myself out of it after reading this: (http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/ see section "Large Queues"). RabbitMQ queues messages in memory, regardless of whether they are durable. The durability only dictates that they are ALSO written to disk... but if you plan for the queue to become fairly large then performance can degrade considerably because at some point it needs to page to disk. I am looking at having large bursts of messages - up to 20 million, and so I felt like allowing the queue to grow that large would probably not work for our use-case. Other MQs might behave differently and I never tested this with RabbitMQ so that might still be a valid approach... I just wanted to minimize the risk and I have the luxury of having control over how the messages are produced.
Our design is still a bit young, but we're going with a work pulling pattern right now... which is something that has worked well for me in the past (this is my first Akka impl though). In terms of the overall approach of using a queue/datastore effectively as a replacement for a mailbox (or implementing your own mailbox backed by a data store) it seems like a valid design to me if you need durable messaging (or just need messaging across heterogeneous systems). It obviously adds the communication/serialization overhead to every message, but really any durable message is going to have the same overhead, so in terms of it being too heavyweight I don't think I'd worry unless very high throughput was a real priority. In that case though there are no great options if you also want the durability... but that is where distributed data stores or queues can help - if you can scale out your datastore and consumers then you can get throughput even on durable messages.
I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use.
I'd be interested to hear some Akka-oriented options from the crew out there.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
-- Martin Krasser blog: http://krasserm.blogspot.com code: http://github.com/krasserm twitter: http://twitter.com/mrt1nz
I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use. I'd be interested to hear some Akka-oriented options from the crew out there.
I'm the author of the blog post from Conspire you referenced. In our case, losing the supervisor isn't a problem because all work is generated from a SQL database. If the supervisor crashes, we can just start over. Our worker nodes subscribe to cluster event notifications and will queue their messages to the supervisor if its node goes down so we also don't lose any in-progress work. In addition, our workers are responsible for their own persistence, we only send instructions and status updates between nodes so there's little concern about losing work in-flight.When we tried to take our first implementation of this system into production (using Akka but with lots of actor+future combos), it crashed *horribly*. Nodes would get marked unreachable within a minute of starting the cluster because worker actors immediately spun their work out to a future and started on the next item, saturating our puny EC2 nodes which prevented Akka's internal cluster/remoting actors from ever getting any CPU time. In our refactor we focused on building a very defensive system which can lose any given node without breaking down—a key to this was pulling work rather than pushing it (plus ditching the futures except when dealing with IO).Since that refactor, we've quadrupled the load on our Akka cluster and had zero issues.
We can trivially add and remove workers if we can a burst of load and if a worker does crash, its work is re-dispatched to another work from its last known "save point" (not really a term we use but I don't want to get into the weeds on what's really a domain-specific concern).
On Tuesday, May 6, 2014 12:29:33 AM UTC-6, massivedynamic wrote:I wouldn't be surprised if there were also some good options using Akka alone. On top of my large queue concerns with Rabbit I didn't particularly feel like adding another component into the mix if I didn't need to. I wouldn't be shocked if you could do something pretty cleanly with just akka persistence. I haven't looked into the new akka streams but that provides some pretty nice looking flow control you might be able to use. I'd be interested to hear some Akka-oriented options from the crew out there.I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like). Something like Persistence might work except for a (probably rare but still plausible) case where the majority of your Akka system goes down, including the mechanisms responsible for Persistence. It also doesn't help when you're working with a data-stream that never stops coming; there's no easy way to catch up when failures happen.I've also been reading about Akka streams and I also checked out the Typesafe activator module that they have with examples. It definitely looks very promising. Unfortunately it seems like it's something that will be released closer to the end of the year. If only it could get here sooner!
--
On Tuesday, May 6, 2014 2:06:16 AM UTC-4, Martin Krasser wrote:
You may be interested in this pull request that enables reading from akka-persistence journals via reactive-stream producers.
On Tuesday, May 6, 2014 2:29:33 AM UTC-4, massivedynamic wrote:I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like).
On Tuesday, May 6, 2014 2:06:16 AM UTC-4, Martin Krasser wrote:You may be interested in this pull request that enables reading from akka-persistence journals via reactive-stream producers.
Yea, actually that looks much like what I had in mind.
On Tuesday, May 6, 2014 2:29:33 AM UTC-4, massivedynamic wrote:
I was also in the same mindset that you have in terms of just wanting a pure Akka solution with a pulling pattern. The only issue that I found there was in the scenario where your master actors (the one's that hold the work that other actors pull from) go down. In this case, you're losing all of the data that the master actors held unless you have some sort of safe-guard in place (not really sure what this might look like).
I am in the same boat as Ryan mentions below - I'm enriching and processing data that already resides in a database, so if a failure occurs I can just reprocess from the start. That is generally the direction I was thinking with a pure Akka implementation based on persistence. If you had a (persistent) Processor(s) just accumulating Tweet events, then a View(s) could subscribe to that processor and emit the events to downstream workers for processing. In that case the View acts as the coordinator and the Processor is the durable mailbox effectively, if either goes down you have the ability to recreate it and effectively pick up where you left off. You'd need to play with the snapshots, replay and recovery a bit to get proper flow control while reading the journal... Based on a quick read of Martin's PR above I think that is where streams would be helpful (a replacement of the View in my scenario). A PersistentChannel might be an easier option now that I think about it... then your workers can confirm when done - providing you automatic replay for anything missed if coordinator/worker dies.
I'm not sure in your case how you might protect yourself if the coordinator dies, although if that is likely there should be some way to minimize the job and state of the coordinator to minimize it's role. So for example if the coordinator is responsible for a) pulling data from the Twitter stream, and b) supervising workers to consume that data, and c) acting on the response from workers and maintaining some state... then perhaps that really is best done with 3 actors. I could see a) and c) possibly being done with actors behind a router to provide some fault-tolerance (in which case "a" couldn't really be a Processor, but I think it could use a PersistentChannel).
In my case I had lots of niggling reasons not to introduce another architectural component - but I think some MQ/DB option is perfectly reasonable. On another note - Ryan, I have been reading your blog also... very helpful, thanks.
Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product.
I'm the author of the blog post from Conspire you referenced. In our case, losing the supervisor isn't a problem because all work is generated from a SQL database. If the supervisor crashes, we can just start over. Our worker nodes subscribe to cluster event notifications and will queue their messages to the supervisor if its node goes down so we also don't lose any in-progress work. In addition, our workers are responsible for their own persistence, we only send instructions and status updates between nodes so there's little concern about losing work in-flight.
I am in the same boat as Ryan mentions below - I'm enriching and processing data that already resides in a database, so if a failure occurs I can just reprocess from the start. That is generally the direction I was thinking with a pure Akka implementation based on persistence. If you had a (persistent) Processor(s) just accumulating Tweet events, then a View(s) could subscribe to that processor and emit the events to downstream workers for processing. In that case the View acts as the coordinator and the Processor is the durable mailbox effectively, if either goes down you have the ability to recreate it and effectively pick up where you left off. You'd need to play with the snapshots, replay and recovery a bit to get proper flow control while reading the journal... Based on a quick read of Martin's PR above I think that is where streams would be helpful (a replacement of the View in my scenario). A PersistentChannel might be an easier option now that I think about it... then your workers can confirm when done - providing you automatic replay for anything missed if coordinator/worker dies.
Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product.
On Wednesday, May 7, 2014 12:57:23 AM UTC-4, Martin Krasser wrote:Please not that the primary use case for persistent channels is to deal with slow and/or temporarily available consumers/destinations. It is not optimized for high throughput (yet). More detailed, a persistent channel usually has a very high write rate (with up to 100k msgs/sec, provided by a Processor it uses internally) but only a moderate message delivery rate to consumers. If you need a persistent queue with a high message throughput, consider using a 3rd party messaging product.
Thanks, good to know. Just out of curiosity, would you also have any performance concerns with using a Processor/View combination acting in place of a 3rd party MQ?
We were initially debating the alternatives to durable mailboxes - the OP was considering implementing a mailbox backed by rabbitmq/kafka/mongo/etc... To me that sounded like a lot of work to get right, and the complexities probably overlap a good bit with akka persistence. I've since seen that akka persistence is the reason for the deprecation of durable mailboxes. So maybe my question should be phrased as follows... given a data pipeline built in Akka with a succession of actors consuming, enriching/processing, and emitting data:
a) If you need persistence at key stages, what factors would lend themselves to introducing a 3rd party product rather than rely on akka persistence? Maybe queue size, throughput, complex distributed pub/sub or routing requirements...
b) If a 3rd party product wasn't needed, what might be a recommended way to implement a durable work queue (with some sort of flow control)?
I sort of threw out that suggestion of a Processor simply receiving/persisting work events, with a View consuming the journal (with some controlled replay) and forwarding to workers. But I could see some potential issues there (latency of the view, managing flow control in the view, not sure it is a proper use of a processor - does nothing really other than receive events).
1. First just to make sure I understood what you guys ended up doing, whenever your supervisor crashes, an event is sent to some sort of event bus to which workers your are subscribed to which let's them know that their supervisor is down. When this happens, each worker that was pulling from the supervisor will send the message they were processing back to the supervisor (at least I think this is what you meant when you said "...will queue their messages to the supervisor") and will then go into some sort of waiting state until they hear back from the supervisor. Once the supervisor confirms to its workers that it's back up, processing continues as normal. Did I get this right?
2. From the code samples, what is the distinction between a Leader and Node?
3. In your blog (on the last section which I linked above), you mention how the Leader actor both creates a queue and spawns workers that do the pulling from said queue. If the leader were to go down, don't it's children (a.k.a. the worker nodes) also go down with it? If so, how do you prevent message loss?
4. Is it still possible to do dynamic re-sizing of a worker router since a worker is only pulling a message at a time and thus its mailbox never really fills up? From what I understand, re-sizing uses an actor's mailbox to see how full it is so I'm wondering how this would be handled.
a. Follow-up question: What if the supervisor takes a while to come back up and as a result, the messages that the facades were storing in their temporary queues become irrelevant? Does the supervisor have a way to deal with "irrelevant" messages once it's brought back up or can this situation never occur?
I should make it clear: neither the facade or the work pulling pattern are our creation—both came from the Akka docs or the letiticrash.com blog. We've tweaked it a bit as we've run into problems but I don't want anyone to think I'm trying to take credit for those ideas.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.