akka streams infinite data source example

1,729 views
Skip to first unread message

Sam Halliday

unread,
Apr 21, 2014, 6:28:11 AM4/21/14
to akka...@googlegroups.com
Hi all,

I am very excited by akka streams -- it aims to solve a problem that I see time and time again. Every time I post to this list it feels like the solution is always "wait until Akka Streams is released...". Finally, it is here!

I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers.

Is there any chance of an example along these lines?

A month or so ago, I asked the same of the RxJava community and it turned out that it was a work-in-progess... so I created this little example comparing various approaches (I didn't write an Akka Actor implementation because it is quite obvious that it would just OOM):


The `ProducerObservableParser` reads in a CSV file one line at a time (the file is far too big to hold in memory), and then processes N rows in parallel, only reading more lines as the consumers finish each row. There is never more than a bounded number of rows in memory at any given point in time.

The RxJava POC Observable is here



But what is the equivalent Akka Streams code? The BasicTransformation example reads in the whole text before "flowing" it, and I couldn't see anything where the consuming was happening in parallel.

Best regards,
Sam

Patrik Nordwall

unread,
Apr 21, 2014, 7:32:57 AM4/21/14
to akka...@googlegroups.com
Hi Sam,

21 apr 2014 kl. 12:28 skrev Sam Halliday <sam.ha...@gmail.com>:

Hi all,

I am very excited by akka streams -- it aims to solve a problem that I see time and time again. Every time I post to this list it feels like the solution is always "wait until Akka Streams is released...". Finally, it is here!

Yeah, exciting times.


I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers.

Isn't that demonstrated with the random number generator source, and its slow consumers?


Is there any chance of an example along these lines?

A month or so ago, I asked the same of the RxJava community and it turned out that it was a work-in-progess... so I created this little example comparing various approaches (I didn't write an Akka Actor implementation because it is quite obvious that it would just OOM):


The `ProducerObservableParser` reads in a CSV file one line at a time (the file is far too big to hold in memory), and then processes N rows in parallel, only reading more lines as the consumers finish each row. There is never more than a bounded number of rows in memory at any given point in time.

That sounds very doable with akka streams. You can control the buffer sizes with the settings of the materializer. A consumer always signals upstream how many more elements it can handle, and the producer is not allowed to send more elements downstream than what was requested.


The RxJava POC Observable is here



But what is the equivalent Akka Streams code? The BasicTransformation example reads in the whole text before "flowing" it, and I couldn't see anything where the consuming was happening in parallel.
BasicTransformation defines the input text in code (to make it simple), but the iterator next() is not called more than what can be consumed downstream.

Isn't the log file sample more similar to your text file input? It does not read the whole file (if it was large) into memory.

/Patrik


Best regards,
Sam

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Sam Halliday

unread,
Apr 21, 2014, 7:55:17 AM4/21/14
to akka...@googlegroups.com

On Monday, 21 April 2014 12:32:57 UTC+1, Patrik Nordwall wrote:
I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers.

Isn't that demonstrated with the random number generator source, and its slow consumers?

I missed that one. How many consumers are there at any given moment?


My example is trying to simulate real world examples of:

* parsing loads of data coming from a single data source (e.g. indexing a multi-TB database with Lucene, running in under 1GB)
* parallel finite element calculations, where there are a lot more elements than bytes of RAM so they have to be batched (and with minimal object churn)

 
BasicTransformation defines the input text in code (to make it simple), but the iterator next() is not called more than what can be consumed downstream.

Isn't the log file sample more similar to your text file input? It does not read the whole file (if it was large) into memory.

Right, so you pass an Iterator[String] to the flow. Yes, that looks good, sorry I missed it.

But Iterator[T] is a little too ill-defined near the end of the stream (that's why I created my own Producer in the RxJava playground). For example, does it block on hasNext or on next if it knows there are more elements that are not yet available, or does it close up and go home? Traditional Java APIs (such as Queues) would actually return early if a queue was exhausted, instead of waiting for a poison pill. In any case, if Flow can handle an Iterator that blocks (e.g. for I/O), it's probably good enough for most situations.

It would be even better if it knew how often to poll the data source... for example I have an EEG (brain scanner) library which has to poll the device at 57Hz. If it does it too quickly, there are inadequacies in the underlying hardware which result in busy spinning (yes, it's insane, and it really does eat the whole CPU)... but if I don't poll quickly enough then data can be lost. Relevant code (and my non-stream hack) here: https://github.com/fommil/emokit-java/blob/master/src/main/java/com/github/fommil/emokit/EmotivHid.java#L84

Best regards,
Sam

Patrik Nordwall

unread,
Apr 21, 2014, 8:20:23 AM4/21/14
to akka...@googlegroups.com


21 apr 2014 kl. 13:55 skrev Sam Halliday <sam.ha...@gmail.com>:


On Monday, 21 April 2014 12:32:57 UTC+1, Patrik Nordwall wrote:
I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers.

Isn't that demonstrated with the random number generator source, and its slow consumers?

I missed that one. How many consumers are there at any given moment?

It has one consumer but two filter steps that can execute pipelined. You can attach several consumers with toProducer, and then start several flows from that. Backpressure works with multiple consumers also.


My example is trying to simulate real world examples of:

* parsing loads of data coming from a single data source (e.g. indexing a multi-TB database with Lucene, running in under 1GB)
* parallel finite element calculations, where there are a lot more elements than bytes of RAM so they have to be batched (and with minimal object churn)

 
BasicTransformation defines the input text in code (to make it simple), but the iterator next() is not called more than what can be consumed downstream.

Isn't the log file sample more similar to your text file input? It does not read the whole file (if it was large) into memory.

Right, so you pass an Iterator[String] to the flow. Yes, that looks good, sorry I missed it.

But Iterator[T] is a little too ill-defined near the end of the stream (that's why I created my own Producer in the RxJava playground). For example, does it block on hasNext or on next if it knows there are more elements that are not yet available, or does it close up and go home? Traditional Java APIs (such as Queues) would actually return early if a queue was exhausted, instead of waiting for a poison pill. In any case, if Flow can handle an Iterator that blocks (e.g. for I/O), it's probably good enough for most situations.

Ah, I see what you mean. Blocking hasNext/next doesn't sound attractive to me. That should probably be another Producer, that can do the polling. 

/Patrik
It would be even better if it knew how often to poll the data source... for example I have an EEG (brain scanner) library which has to poll the device at 57Hz. If it does it too quickly, there are inadequacies in the underlying hardware which result in busy spinning (yes, it's insane, and it really does eat the whole CPU)... but if I don't poll quickly enough then data can be lost. Relevant code (and my non-stream hack) here: https://github.com/fommil/emokit-java/blob/master/src/main/java/com/github/fommil/emokit/EmotivHid.java#L84

Best regards,
Sam

--

Sam Halliday

unread,
Apr 21, 2014, 9:59:40 AM4/21/14
to akka...@googlegroups.com
On Monday, 21 April 2014 12:32:57 UTC+1, Patrik Nordwall wrote:
I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers.
Isn't that demonstrated with the random number generator source, and its slow consumers?
I missed that one. How many consumers are there at any given moment?
It has one consumer but two filter steps that can execute pipelined. You can attach several consumers with toProducer, and then start several flows from that. Backpressure works with multiple consumers also.


OK great. I did actually see this example and that's not what I mean. I'd really like to be able to specify (e.g. in runtime config files) how many maximum threads can be running in the "filter(rnd => isPrime(rnd))" block.

Say we want to do the filtering in parallel, using 2 cores. Imagine the first random number that we get is really big and takes a few seconds to check if it is prime. The second number is "3" and we instantly accept it.... it would be preferable if this result were held back until the first answer became available, but the free core still goes on to check the third number.

Alternatively, I can imagine situations where order does not matter at all. This is all considered in the Observable pattern, so I should imagine you have also included it :-)

Does that make sense? Would it be tricky to update the primes example in this way?

Adding a second flow is a very different thing. I think I'd need to read the docs (and source code) in a lot more detail before understanding the consequences for a particular Producer (e.g. does it replay from the start, is it sending the same results to all flows, are all flows getting the same order of events, etc). This is of less interest to me at the moment, but I can see it being very important.


 
But Iterator[T] is a little too ill-defined near the end of the stream (that's why I created my own Producer in the RxJava playground). For example, does it block on hasNext or on next if it knows there are more elements that are not yet available, or does it close up and go home? Traditional Java APIs (such as Queues) would actually return early if a queue was exhausted, instead of waiting for a poison pill. In any case, if Flow can handle an Iterator that blocks (e.g. for I/O), it's probably good enough for most situations.

Ah, I see what you mean. Blocking hasNext/next doesn't sound attractive to me. That should probably be another Producer, that can do the polling. 



Now I'm confused whether Producer is a pull or push based source... in the examples, I was getting the impression that it was very much a pull based API (and would therefore have to block on some level, if data is not available yet). Is it also a pusher?

The brain scanner project is an example of a pusher source... throttling it doesn't make any sense unless it is acceptable to throw results away (i.e. not collect them in time). So, yes, you are absolutely correct that a blocking Iterator is not good here.

However, for datasources (e.g. reading from a really big query result over a SQL connection), the "next" or "hasNext" in an equivalent Iterator may very well block and there is no way to get around this. Indeed, you will have the same problem with Source.fromFile(...).readLines, exaggerated if the file is on a really slow hard drive (or a network drive).

Best regards,
Sam

Patrik Nordwall

unread,
Apr 22, 2014, 7:08:47 AM4/22/14
to akka...@googlegroups.com
On Mon, Apr 21, 2014 at 3:59 PM, Sam Halliday <sam.ha...@gmail.com> wrote:
On Monday, 21 April 2014 12:32:57 UTC+1, Patrik Nordwall wrote:
I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers.
Isn't that demonstrated with the random number generator source, and its slow consumers?
I missed that one. How many consumers are there at any given moment?
It has one consumer but two filter steps that can execute pipelined. You can attach several consumers with toProducer, and then start several flows from that. Backpressure works with multiple consumers also.


OK great. I did actually see this example and that's not what I mean. I'd really like to be able to specify (e.g. in runtime config files) how many maximum threads can be running in the "filter(rnd => isPrime(rnd))" block.

Say we want to do the filtering in parallel, using 2 cores. Imagine the first random number that we get is really big and takes a few seconds to check if it is prime. The second number is "3" and we instantly accept it.... it would be preferable if this result were held back until the first answer became available, but the free core still goes on to check the third number.

Alternatively, I can imagine situations where order does not matter at all. This is all considered in the Observable pattern, so I should imagine you have also included it :-)

Does that make sense? Would it be tricky to update the primes example in this way?

This should be possible when we have all operators in place. Please create a ticket, so that it's not forgotten.
 

Adding a second flow is a very different thing. I think I'd need to read the docs (and source code) in a lot more detail before understanding the consequences for a particular Producer (e.g. does it replay from the start, is it sending the same results to all flows, are all flows getting the same order of events, etc). This is of less interest to me at the moment, but I can see it being very important.


 
But Iterator[T] is a little too ill-defined near the end of the stream (that's why I created my own Producer in the RxJava playground). For example, does it block on hasNext or on next if it knows there are more elements that are not yet available, or does it close up and go home? Traditional Java APIs (such as Queues) would actually return early if a queue was exhausted, instead of waiting for a poison pill. In any case, if Flow can handle an Iterator that blocks (e.g. for I/O), it's probably good enough for most situations.

Ah, I see what you mean. Blocking hasNext/next doesn't sound attractive to me. That should probably be another Producer, that can do the polling. 



Now I'm confused whether Producer is a pull or push based source... in the examples, I was getting the impression that it was very much a pull based API (and would therefore have to block on some level, if data is not available yet). Is it also a pusher?

Yes. It gets a request from downstream consumer of X more elements. That doesn't mean that it replies immediately with X elements, but it asynchronously pushes up to X elements whenever it has them. Exactly how it "waits" for the elements itself is up to the implementation of the Producer. 
 

The brain scanner project is an example of a pusher source... throttling it doesn't make any sense unless it is acceptable to throw results away (i.e. not collect them in time). So, yes, you are absolutely correct that a blocking Iterator is not good here.

Sure, it could be implemented as an actor that periodically poll the device, but it must still not send more elements downstream than what was requested. If it can't slow down the polling, it must buffer and finally throw away elements.


However, for datasources (e.g. reading from a really big query result over a SQL connection), the "next" or "hasNext" in an equivalent Iterator may very well block and there is no way to get around this.

Async database driver? Yeah, I know there are not many of those, and then you have to use blocking somewhere. Nothing wrong with that, just keep it managed. Akka streams are implemented with Actors, and thereby you can use a dedicated dispatcher for such things.
 
Indeed, you will have the same problem with Source.fromFile(...).readLines, exaggerated if the file is on a really slow hard drive (or a network drive).

Yes, there will most likely be special producers for working with files.

/Patrik
 

Best regards,
Sam

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Sam Halliday

unread,
Apr 22, 2014, 3:32:07 PM4/22/14
to akka...@googlegroups.com
Thanks Patrik,

Well it sounds like everything is heading in the right direction, if
it is not already there yet. I'm very excited to see what will be in
the next milestone :-D
> You received this message because you are subscribed to a topic in the
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/akka-user/1TlAy-oqOk8/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to

Patrik Nordwall

unread,
Apr 23, 2014, 5:01:30 AM4/23/14
to akka...@googlegroups.com
On Tue, Apr 22, 2014 at 9:32 PM, Sam Halliday <sam.ha...@gmail.com> wrote:
Thanks Patrik,

Well it sounds like everything is heading in the right direction, if
it is not already there yet. I'm very excited to see what will be in
the next milestone :-D

Thanks for trying it out and for your feedback.
/Patrik

Sam Halliday

unread,
Dec 10, 2014, 11:31:08 AM12/10/14
to akka...@googlegroups.com
Hi Patrick,

I'm guessing things have moved on since April when I first started this thread.

Does the latest release of akka streams handle my example?

Best regards,
Sam

Björn Antonsson

unread,
Dec 15, 2014, 4:49:46 AM12/15/14
to akka...@googlegroups.com
Hi Sam,

You are right that things have moved on since April. A lot have changed and we are actively building docs and cookbock examples for how to use the streams.

There is a pull request with a previes of some cookbock examples. It would be great if you could see if we are missing things that you need for your use case and maybe open tickets for the things that you think are missing.

B/

-- 
Björn Antonsson
Typesafe – Reactive Apps on the JVM
twitter: @bantonsson

Reply all
Reply to author
Forward
0 new messages