Akka Streams example

575 views
Skip to first unread message

Jerry Tworek

unread,
Feb 20, 2016, 4:03:35 AM2/20/16
to ReactiveMongo - http://reactivemongo.org
Hi,

In the documentation in several places it is mentioned that there is a support of Akka Streams in Reactive Mongo streaming capabilities. Do you know if there are any code examples/documentation on this subject? I wasn't able to find anything here: http://reactivemongo.org/releases/0.11/documentation/tutorial/consume-streams.html or on this group.

Regards,
Jerry

Cédric Chantepie

unread,
Feb 20, 2016, 1:06:49 PM2/20/16
to ReactiveMongo - http://reactivemongo.org
Hi, you can have a look at https://github.com/cchantep/RM-AkkaStreams . Best regards.

Jerry Tworek

unread,
Feb 25, 2016, 5:38:32 AM2/25/16
to ReactiveMongo - http://reactivemongo.org
Hi Cedric,

Thank you for your example. I've played around with it and it works but I have a few questions about the mechanism, as I don't fully understand how reactive mongo/akka streams internals work in case of streaming mongodb cursors:

1. In your code, you consume the stream using Cusor.foldWhile and while folding you concatenate the singe-element-sources. That returns a Future[Source] as cursor is folded asynchronously, and then you make Source.fromFuture and foldMapMerge to get source of elements you've requested. My interpretation may not be correct, but the way I see it work, is that future is resolved only after all documents have ben retrieved from the cursor - the future will yield source with all elements concatenated already held in memory. That may result in a memory overflow when the collection is big as well as does not cooperate with backpressure signals of akka streams.

2. Does ReactiveMongo cursors support some kind of backpressure mechanism? The way I see foldWhile - it needs an answer whether we want to fetch new item or close the cursor as soon as a previous document arrives. That is a bit different methodology from akka streams, where you're expected to push element after you've gotten a pull signal - a request for it. Do you know if implementing such mechanism is possible in reactive mongo?

Cédric Chantepie

unread,
Feb 25, 2016, 8:33:08 AM2/25/16
to ReactiveMongo - http://reactivemongo.org
The foldWhile allows to consume documents as soon as a MongoDB response is available, even if all the responses (all the result batches) are not yet received. That's how the new Iteratees module and the coming Akka Stream and RxScala modules are working. Best regards.

Cédric Chantepie

unread,
Feb 25, 2016, 10:16:56 AM2/25/16
to ReactiveMongo - http://reactivemongo.org
To be more complete, the Cursor.foldWhile "waits" each response/document partition is consumed (in the async consumption thread) before sending the next GetMore request to MongoDB, so unconsumed data must not accumulate in a "buffer".

Jerry Tworek

unread,
Feb 26, 2016, 10:26:41 AM2/26/16
to ReactiveMongo - http://reactivemongo.org
Hi Cedric,

Thank you for your answers. Maybe I was a bit unclear with my questions, so I'll try to rephrase:

1. foldWhile parameter suc - is a function used to process documents coming from mongodb. This function is synchronous. Because of that, the processing of entries cannot stop to wait for some asynchronous operation without blocking the thread by using e.g. Await.ready - doable but not nice. Waiting for the akka-streams sink to finish processing current batch is an example of such asynchronous operation. And backpressuring mechanism (making sure that source doesn't generate elements faster than sink can consume) is an important mechanism in akka streams.


"The purpose is to offer an intuitive and safe way to formulate stream processing setups such that we can then execute them efficiently and with bounded resource usage—no more OutOfMemoryErrors. In order to achieve this our streams need to be able to limit the buffering that they employ, they need to be able to slow down producers if the consumers cannot keep up. This feature is called back-pressure and is at the core of the Reactive Streams initiative of which Akka is a founding member. "

2. foldWhile processes documents one by one as they're coming from mongodb, but I think your implementation of cursor.source() doesn't have that property - first, it waits for the cursor.foldWhile to finish getting all the documents and stores them in the memory and only then starts sending the documents downstream - that means the resource usage is not bounded. I've made a test for that:

I've introduced print statement in this line:

(src, res) => println(s"Received res.id"); Cont(src.concat(Source single res).

mapMaterializedValue(_ => akka.NotUsed)),
As well as moved the documents through a very simple stream:

source.map { x => println(s"Processing ${x.id}"); x}.to(Sink.ignore)

And that also supports my theory - first all "Received" lines are printed an then only afterwards the "Processing" lines show up. 
I've also managed to get java.lang.StackOverflowError when trying to run this source on a large collection of documents.

Let me know if I've missed someting.

Jerry

Cédric Chantepie

unread,
Feb 26, 2016, 11:44:45 AM2/26/16
to ReactiveMongo - http://reactivemongo.org

1. foldWhile parameter suc - is a function used to process documents coming from mongodb. This function is synchronous.

What do you mean?
(For me there is no synchronous or asynchronous function, that's the way it's applied and what it does which is important.)

There the foldWhile returns immediately, as soon as the first request can be done, possibly before the suc is called for the documents of the first Response.
 
Because of that, the processing of entries cannot stop to wait for some asynchronous operation without blocking the thread by using e.g. Await.ready - doable but not nice.
Waiting for the akka-streams sink to finish processing current batch is an example of such asynchronous operation.

I don't see your point? Of which asynchronous operation are you exectly speaking?

If you want to completely process the entry passed to suc, and so you do it synchronously, not returning the next Cursor.State until you are happy.
Or you "push" the entry to some asynchronous process (Future, Actor message, ...) and immediatly return the next Cursor.State, even if the entry can be not yet processed.
 
 And backpressuring mechanism (making sure that source doesn't generate elements faster than sink can consume) is an important mechanism in akka streams.

The proposed Akka Stream extension does it, by using the foldWhile which "waits" the suc or err functions ("consumers") return, either immediatly if there are able to do it asynchronously (so indeed it doesn't wait), or waiting it has time/resource to do it.

The FailoverStrategy is used to manage retries and timeout.



2. foldWhile processes documents one by one as they're coming from mongodb, but I think your implementation of cursor.source() doesn't have that property - first, it waits for the cursor.foldWhile to finish getting all the documents and stores them in the memory and only then starts sending the documents downstream

That's not the case (see previous explanation) with the foldWhile, if you see such case with the Akka Stream extension (still experimental), a PR is welcome.

Best regards

Cédric Chantepie

unread,
Feb 26, 2016, 12:07:33 PM2/26/16
to ReactiveMongo - http://reactivemongo.org
If you mean there is an issue with Source.concat, it's planned to be a Source from an ActorPublisher.

Jerry Tworek

unread,
Feb 26, 2016, 12:10:38 PM2/26/16
to ReactiveMongo - http://reactivemongo.org


On Friday, February 26, 2016 at 5:44:45 PM UTC+1, Cédric Chantepie wrote:

1. foldWhile parameter suc - is a function used to process documents coming from mongodb. This function is synchronous.

What do you mean?
(For me there is no synchronous or asynchronous function, that's the way it's applied and what it does which is important.)

The way I mean it is that synchronous function is of type A -> B and blocks the caller until it's finished while asynchronous is of type A -> Future[B] and doesn't block the caller.
 

There the foldWhile returns immediately, as soon as the first request can be done, possibly before the suc is called for the documents of the first Response.
 
Because of that, the processing of entries cannot stop to wait for some asynchronous operation without blocking the thread by using e.g. Await.ready - doable but not nice.
Waiting for the akka-streams sink to finish processing current batch is an example of such asynchronous operation.

I don't see your point? Of which asynchronous operation are you exectly speaking?

If you want to completely process the entry passed to suc, and so you do it synchronously, not returning the next Cursor.State until you are happy.
Or you "push" the entry to some asynchronous process (Future, Actor message, ...) and immediatly return the next Cursor.State, even if the entry can be not yet processed.

Yes, I think I'm struggling a bit with nomenclature, but here is what I have in mind - I'd like to push the entry to e.g. an actor just like you've said above and then - wait for the actor to acknowledge that he finished processing (send some message back) - and afterwards only return Cursor.State from the suc function. And such thing I believe is possible only with usage of a blocking operation like Await.ready.
 
2. foldWhile processes documents one by one as they're coming from mongodb, but I think your implementation of cursor.source() doesn't have that property - first, it waits for the cursor.foldWhile to finish getting all the documents and stores them in the memory and only then starts sending the documents downstream

That's not the case (see previous explanation) with the foldWhile, if you see such case with the Akka Stream extension (still experimental), a PR is welcome.

Yes, that's exactly what I'm thinking - my above questions are just me trying to understand if that's possible to accomplish given interface offered by foldWhile.  I'll check now whether play-iteratees give slightly more flexible interface.

Krzysiek Nowakowski

unread,
Jul 11, 2017, 5:27:11 AM7/11/17
to ReactiveMongo - http://reactivemongo.org

Hi Jerzy T worek,
I am struggling with creating source using foldWhile. I have exactly same doubts as you: source from foldWhile won't emit elements before whole mongodb collection will be processed. Therefore large collection will cause problems. 

Did you find a way how to deal with this issue? Currently I am writing a program which streams from aggregations and I am stuck. I searched everywhere and have literally no idea what to do.

Kind reagards,
Krzysiek Nowakowski

Cédric Chantepie

unread,
Jul 11, 2017, 6:58:45 AM7/11/17
to ReactiveMongo - http://reactivemongo.org
MongoDB returns results as batches. As soon as a batch is received, it's available in ReactiveMongo.
Reply all
Reply to author
Forward
0 new messages