Re: [akka-user] Re: How can I throttle asynchronous producer streaming from MongoDB

已查看 247 次
跳至第一个未读帖子

Björn Antonsson

未读,
2013年3月12日 06:20:162013/3/12
收件人 Akka User List
Hi Andrew,

On Monday, 11 March 2013 at 11:54, Andrew Easter wrote:
I've been thinking I could consider introducing an actor that handles the produced messages that would send a StopTailing message to the producer when a certain production rate is exceeded. The same actor could then schedule a StartTailing message to be sent after a defined duration.

Does that sound like a sensible pattern?

That is one way to do it. There is a very good blog post about how to throttle actor messages here http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2 that might give you some more ideas.

As for the original question. I think that you need to implement your own Iteratee that applies the back pressure to the ReactiveMongo Enumerator based on how much work the throttler can handle. Exactly how to do this is unfortunately beyond my Play knowledge. Maybe someone on the play-framework mailing list has some more input on your question.

B/


On Monday, 11 March 2013 10:12:24 UTC, Andrew Easter wrote:
Hi,

A simplified view of my Akka use case involves a "producer" actor that is producing messages that are sent on to other parts of the my actor system. The producer is fetching data from MongoDB. Right now, the "consumer" actors are not able to keep up with the rate of the producer and thus I eventually face running out of heap space.

I've been reading up strategies with regard to throttling but none of them, in my mind, would work in my scenario. The issue is that my producer is getting it's data using ReactiveMongo - I'm creating a tailable cursor. When data is encountered, it is being delivered asynchronously via a callback (using Play! Iteratees):

cursor = Some(messages.find(new QueryBuilder(), QueryOpts().tailable.awaitData))
          cursor.get.enumerate.apply(Iteratee.foreach({
            doc =>
              val json = BSONConversions.fromBSON(doc)
              receiver ! Notification(json)
          }))

One strategy I've seen relates to consumers actually informing the producer that they are ready for work, but I can't see how I can do this with the streaming tailable cursor approach.

As I'm new to Akka, I'm still getting my head around the concurrency model. In a non-actor model, I guess you could consider throttling the thread handling the callback from the tailable cursor - however I'm not sure this makes sense in this Akka context.

Tools such as ReactiveMongo are purposely designed for non-blocking usage scenarios, which in turn makes them play well in an Akka environment. Thus, I feel that there must be (and needs to be) a good pattern for this type of scenario. Any ideas on how I might deal with this?

Thanks,

Andrew

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

-- 
Björn Antonsson
Typesafe – The software stack for applications that scale
twitter: @bantonsson

Björn Antonsson

未读,
2013年3月13日 04:22:222013/3/13
收件人 akka...@googlegroups.com
Hi Andrew,

On Wednesday, 13 March 2013 at 00:06, Andrew Easter wrote:
Thanks for the insight.

Having done some more reading I'm thinking of following the pattern of turning the problem around so to speak and have consumers ask for work. Granted this probably means doing away with a tailable cursor, but I don't think that is necessarily a bad thing. In this way I can be sure work is only requested once a previous batch has been completed, thus preventing mailbox sizes ever exceeding a defined maximum. And it beats a simple polling approach where you might more likely see inefficient utilisation of the available CPU resources.

That sounds like a good solution. Happy hakking.

B/

Andrew
回复全部
回复作者
转发
0 个新帖子