Hi Cedric,
Thank you for your answers. Maybe I was a bit unclear with my questions, so I'll try to rephrase:
1. foldWhile parameter suc - is a function used to process documents coming from mongodb. This function is synchronous. Because of that, the processing of entries cannot stop to wait for some asynchronous operation without blocking the thread by using e.g. Await.ready - doable but not nice. Waiting for the akka-streams sink to finish processing current batch is an example of such asynchronous operation. And backpressuring mechanism (making sure that source doesn't generate elements faster than sink can consume) is an important mechanism in akka streams.
"
The purpose is to offer an intuitive and safe way to formulate stream processing setups such that we can then execute them efficiently and with bounded resource usage—no more OutOfMemoryErrors. In order to achieve this our streams need to be able to limit the buffering that they employ, they need to be able to slow down producers if the consumers cannot keep up. This feature is called back-pressure and is at the core of the Reactive Streams initiative of which Akka is a founding member. "
2. foldWhile processes documents one by one as they're coming from mongodb, but I think your implementation of cursor.source() doesn't have that property - first, it waits for the cursor.foldWhile to finish getting all the documents and stores them in the memory and only then starts sending the documents downstream - that means the resource usage is not bounded. I've made a test for that:
I've introduced print statement in this line:
(src, res) => println(s"Received res.id"); Cont(src.concat(Source single res). |
| mapMaterializedValue(_ => akka.NotUsed)), |
As well as moved the documents through a very simple stream:
source.map { x => println(s"Processing ${x.id}"); x}.to(Sink.ignore)
And that also supports my theory - first all "Received" lines are printed an then only afterwards the "Processing" lines show up.
I've also managed to get java.lang.StackOverflowError when trying to run this source on a large collection of documents.
Let me know if I've missed someting.
Jerry