Feedback on Akka Streams 1.0-M2 Documentation

175 views
Skip to first unread message

Sam Halliday

unread,
Jan 20, 2015, 2:04:45 PM1/20/15
to akka...@googlegroups.com
Dear Akka Team,

In response to Bjorn's prompting[1], I have read all the Akka
Streams 1.0-M2 documentation[2].

I am very impressed, and excited! There has clearly been a lot of
thought put into this framework and I am looking forward to using
it in anger over the coming years.

Bjorn asked if I felt any examples were missing, and sadly my
original request (that I've been going on about for years,
sorry!) is indeed missing. It is the case of a fast producer and
a slow consumer that is ideal for parallelisation.

I believe it may be possible to use the current 1.0-M2 to address
my bugbear by using the Actor integration to write an actor that
has N instances behind a router, but it feels hacky to have to
use an Actor at all. What is really missing is a Junction that
multiplies the next Flow into N parallel parts that run on
separate threads.


In general, I felt that the documentation was missing any
commentary on concurrency and parallelisation. I was left
wondering what thread everything was happening on. Some initial
questions I have in this area:

1. Is everything actually executed in the same thread? What about
   when you have a Junction?

2. Is it possible to be doing work to populate the Source's cache
   while work is being executed in a Flow or Sink?

It would be good to have a section in the documentation that
discusses this in more detail.


And, very importantly, it would be good to have the feature of
being able to split a Flow into N parallel parts! I recently
learnt how to do this in ScalazStreams but I'm much rather be
able to do it in Akka Streams as I find everything else about the
architecture to be so much easier to understand (plus integration
with Akka Actors is just tremendous).

PS: I'm also very excited by Slick 3.0 which appears to be aiming toward Reactive Streams and, I assume, integration with Akka Streams. e.g. produce a Source[Entity] from a SELECT with pushback on extremely large result sets.



Best regards,
Sam (fommil)

Sam Halliday

unread,
Jan 20, 2015, 3:56:30 PM1/20/15
to akka...@googlegroups.com
One more comment on the streams API. It is really cool that you've thought about using mapConcat instead of flatMap to enable optimised merge operations. I just wanted to draw your attention to a clojure project that does super-fast merging of immutable Tree/Trie structures: https://github.com/JulesGosnell/seqspert

The work is definitely portable to the Scala collection types, as they are based on the Clojure implementations.

Endre Varga

unread,
Jan 21, 2015, 3:03:36 AM1/21/15
to akka...@googlegroups.com
Hi Sam,

On Tue, Jan 20, 2015 at 9:56 PM, Sam Halliday <sam.ha...@gmail.com> wrote:
One more comment on the streams API. It is really cool that you've thought about using mapConcat instead of flatMap to enable optimised merge operations. I just wanted to draw your attention to a clojure project that does super-fast merging of immutable Tree/Trie structures: https://github.com/JulesGosnell/seqspert

I feel a misunderstanding here. Akka Streams is not about data structures, it is about streams. I mean real streams, like live TCP incoming bytes. Unfortunately Java also chose the name Stream for another concept, but that is more targeted for collections. In Akka Streams on the other hand streams backed by collections is the rare case, not the common one (although many examples use collections as sources since they are easy to show).

 -Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Endre Varga

unread,
Jan 21, 2015, 3:21:15 AM1/21/15
to akka...@googlegroups.com
Hi Sam,



Bjorn asked if I felt any examples were missing, and sadly my
original request (that I've been going on about for years,
sorry!) is indeed missing. It is the case of a fast producer and
a slow consumer that is ideal for parallelisation.

There are many examples of slow consumers, for example here is a global rate limiter pattern: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Globally_limiting_the_rate_of_a_set_of_streams

Obviously that pattern is all about making the consumer slow (and therefore limiting the producer in turn). Also, any fast producer - slow consumer pair is automatically handled by the internal backpressure mechanism, there is nothing special to show about it in examples -- it always looks like source.via(flow).to(sink) independently of the rate of the components.
 

I believe it may be possible to use the current 1.0-M2 to address
my bugbear by using the Actor integration to write an actor that
has N instances behind a router, but it feels hacky to have to
use an Actor at all. What is really missing is a Junction that
multiplies the next Flow into N parallel parts that run on
separate threads.



Also, you can try mapAsync or mapAsyncUnordered for similar tasks.
 

In general, I felt that the documentation was missing any
commentary on concurrency and parallelisation. I was left
wondering what thread everything was happening on. Some initial
questions I have in this area:

1. Is everything actually executed in the same thread? What about
   when you have a Junction?

This definitely needs careful documentation, but we will also need some extra API that allows the user to define boundaries where processing stages are cut into concurrent pieces.
For now, as the default all stream processing element is backed by an actor. So if you have:

  mySource.map().map().to(mySink)

then in the most common case there will be 4 actors each backing one of the parts (1 for the source, 2 for the 2 maps and 1 for the sinks). There is an experimental fusing support that is not yet documented and off by default that would be able to meld the two maps into one actor. In the future we will allow users to add explicit markers where the materializer needs to cut up chains/graphs into concurrent entities. It will be a best effort transformation in the sense that:
 - elements explicitly marked by the users to be concurrent to each other will be guaranteed to be concurrent
 - elements inside those segments are attempted to be fused so there is no concurrency, but this will not be always guaranteed.
 

2. Is it possible to be doing work to populate the Source's cache
   while work is being executed in a Flow or Sink?

What do you mean by the Source's cache? Maybe there is a misunderstanding here. I recommend to look at this section in the docs: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html

If you meant buffering you can just use 

  mySource.buffer(100, OverflowStrategy.Backpressure).via(myFlow).to(source)

which will introduce a buffer element between your source and flow and it will slurp at max 100 elements from the source upfront if the flow next to it is not fast enough to consume elements.
 

It would be good to have a section in the documentation that
discusses this in more detail.


And, very importantly, it would be good to have the feature of
being able to split a Flow into N parallel parts!

Please be more explicit when you talk about parallellism here. Just to demonstrate, there are two ways to have parallel/concurrent processing in streams:
 1. If you have processing stages A, B, C then they can run concurrently, C works on an older element while A already processes the next. See http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-rate.html
 2. parallelizing a stage A to be a pool of A1, A2, A3, ... . I pasted a cookbook recipe for that, but also mapAsync and mapAsyncUnordered can be used that way too. In fact, since those work on Futures, you can combine them with the ask pattern and a routed pool of worker actors.
 
I recently
learnt how to do this in ScalazStreams but I'm much rather be
able to do it in Akka Streams as I find everything else about the
architecture to be so much easier to understand (plus integration
with Akka Actors is just tremendous).

PS: I'm also very excited by Slick 3.0 which appears to be aiming toward Reactive Streams and, I assume, integration with Akka Streams. e.g. produce a Source[Entity] from a SELECT with pushback on extremely large result sets.



Best regards,
Sam (fommil)

Sam Halliday

unread,
Jan 21, 2015, 3:45:47 AM1/21/15
to akka...@googlegroups.com

Err, Endre, yes. And when merging Seq[T] in a mapConcat we would all benefit greatly from faster merging, Seq[Seq[T]] => Seq[T]

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/SYmCYGQppKw/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Endre Varga

unread,
Jan 21, 2015, 3:50:29 AM1/21/15
to akka...@googlegroups.com
On Wed, Jan 21, 2015 at 9:45 AM, Sam Halliday <sam.ha...@gmail.com> wrote:

Err, Endre, yes. And when merging Seq[T] in a mapConcat we would all benefit greatly from faster merging, Seq[Seq[T]] => Seq[T]

What do you mean? I don't understand. What mapConcat does is that it takes a Source[T] and a function T => Seq[U] and gives a Source[U] (similarly with Flow). Source is not a Seq, it is a completely different beast.

Sam Halliday

unread,
Jan 21, 2015, 4:07:01 AM1/21/15
to akka...@googlegroups.com

There was some discussion about merging Seq in the docs. Anyway, it is a side point and I don't want to get derailed from my original questions so I will read your other responses later today.

Endre Varga

unread,
Jan 21, 2015, 4:09:10 AM1/21/15
to akka...@googlegroups.com
On Wed, Jan 21, 2015 at 10:06 AM, Sam Halliday <sam.ha...@gmail.com> wrote:

There was some discussion about merging Seq in the docs.

If that is so, can you point to that part? We might need to rewrite it to clarify what it is about and what it is not. If you find it can you file a ticket?

Thanks,
-Endre

Sam Halliday

unread,
Jan 21, 2015, 4:31:14 AM1/21/15
to akka...@googlegroups.com

It's the wording "with merge being the preferred strategy". From your email it is clear that merge is *not* the strategy used in Akka streams, so perhaps best to drop this sentence as it confuses more than clarifies. Instead, it would be instructive to note that a Source is returned and perhaps talk about the strategy that is used.

Sam Halliday

unread,
Jan 22, 2015, 6:02:54 PM1/22/15
to akka...@googlegroups.com
On Wednesday, 21 January 2015 08:21:15 UTC, drewhk wrote:
>> I believe it may be possible to use the current 1.0-M2 to address
>> my bugbear by using the Actor integration to write an actor that
>> has N instances behind a router, but it feels hacky to have to
>> use an Actor at all. What is really missing is a Junction that
>> multiplies the next Flow into N parallel parts that run on
>> separate threads.
>

I actually missed this when reading the docs... it's a gem buried
in a sea of examples! :-) Is there anything like this in the "top
down" style documentation?

A convenience method to set up this sort of thing is exactly what
I mean. I should imagine that fanning out a Flow for
embarrasingly parallel processing is a common enough pattern that
one would want to be able to do this in a one liner. You note
something about work in this area later on (quoted out of order):

> In the future we will allow users to add explicit markers where
> the materializer needs to cut up chains/graphs into concurrent
> entities.

This sounds very good. Is there a ticket I can subscribe to for
updates? Is there a working name for the materializer so that I
know what to look out for?


> Also, you can try mapAsync or mapAsyncUnordered for similar
> tasks.

It would be good to read some discussion about these that goes
further than the API docs. Do they just create a Future and
therefore have no way to tell a fast producer to slow down? How
does one achieve pushback from these? Pushback on evaluation of
the result is essential, not on the ability to create/schedule
the futures. I would very like to see some documentation that
explains where this has an advantage over plain 'ol Scala Stream
with a map{Future{...}}.


>> In general, I felt that the documentation was missing any
>> commentary on concurrency and parallelisation. I was left
>> wondering what thread everything was happening on.
>
> ... as the default all stream processing element is backed by
>  an actor ...

The very fact that each component is backed by an Actor is news
to me. This wasn't at all obvious from the docs and actually
the "integration with actors" section made me think that streams
must be implemented completely differently if it needs an
integration layer! Actually, the "actor integration" really
means "low level streaming actors", doesn't it? I would strongly
recommend making this a lot clearer as it helps to de-mystify the
implementation.

Now knowing that each vertex in the graph is backed by an actor,
I wonder if in "balancing work to fixed pools" the Balance is
just adding a router actor with a balance strategy? The
convenience method I suggested above could simply create a router
to multiple instances of the same actor with a parallelism bound.
I'm not entirely sure why one would need a Merge strategy for
that, although the option to preserve input order at the output
would be good for some circumstances (which could apply pushback
in preference to growing the queue of out-of-order results).

In addition, this revelation helps me to understand the
performance considerations of using akka-streams. Knowing this,
it would only be appropriate for something I would've considered
suitable (from a performance point of view) for hand-crafted flow
control in akka before streams was released. The main advantage
of akka-streams is therefore that it has dramatically lowered the
barrier of entry for writing Actor code with flow control.



Thanks for this explanation Endre, I hope to see even more
innovation in the coming releases and improved documentation.

Best regards,
Sam 

Endre Varga

unread,
Jan 23, 2015, 12:24:17 PM1/23/15
to akka...@googlegroups.com
Hi Sam,

On Fri, Jan 23, 2015 at 12:02 AM, Sam Halliday <sam.ha...@gmail.com> wrote:
On Wednesday, 21 January 2015 08:21:15 UTC, drewhk wrote:
>> I believe it may be possible to use the current 1.0-M2 to address
>> my bugbear by using the Actor integration to write an actor that
>> has N instances behind a router, but it feels hacky to have to
>> use an Actor at all. What is really missing is a Junction that
>> multiplies the next Flow into N parallel parts that run on
>> separate threads.
>

I actually missed this when reading the docs... it's a gem buried
in a sea of examples! :-) Is there anything like this in the "top
down" style documentation?

Currently no, the documentation is about explaining the elements, the cookbook is a list of examples/patterns that can be used directly or modified, or just as a source of inspiration. But yes, we should link from the main pages into the relevant cookbook sections.
 

A convenience method to set up this sort of thing is exactly what
I mean. I should imagine that fanning out a Flow for
embarrasingly parallel processing is a common enough pattern that
one would want to be able to do this in a one liner. You note
something about work in this area later on (quoted out of order):

If you take that recipe you have a one-liner :) Our main philosophy is to not put overly many combinators prepackaged instead encourage flexible use of them. It is about giving a fish or teaching how to fish :) The idea is that if certain patterns seems to be widely used we promote them to be library provided combinators.
 

> In the future we will allow users to add explicit markers where
> the materializer needs to cut up chains/graphs into concurrent
> entities.

This sounds very good. Is there a ticket I can subscribe to for
updates? Is there a working name for the materializer so that I
know what to look out for?

Not really, there are multiple tickets. The ActorBasedFlowMaterializer has an Optimizations parameter which is currently not documented. It will eagerly collapse entities into synchronous ones as much as possible, but currently there is no API to add boundaries to this collapse procedure (e.g. you have two map stages that you *do* want to keep conccurrent and pipelined). Also it cannot collapse currently graph elements, stream-of-stream elements, mapAsync and the timed elements.

Also, remember that this is about pipelining which is different from the parallellization demonstrated in the cookbook.
 


> Also, you can try mapAsync or mapAsyncUnordered for similar
> tasks.

It would be good to read some discussion about these that goes
further than the API docs. Do they just create a Future and
therefore have no way to tell a fast producer to slow down?

A mapAsync/mapAsyncUnordered will create these futures in a bounded number at any time and emit their result once they are completed one-by-one once the downstream is able to consume them. Once this happened a new Future is created. So at any given time there are a bounded number of uncompleted futures. In other words Future completion is the backpressure signal to the upstream.
 
How
does one achieve pushback from these? Pushback on evaluation of
the result is essential, not on the ability to create/schedule
the futures. I would very like to see some documentation that
explains where this has an advantage over plain 'ol Scala Stream
with a map{Future{...}}.

Doing the above on a Scala Stream will create arbitrarily many Futures, and it will not wait for the result of those Futures. Our mapAsync on the other hand waits for the result of the Futures and emits those (i.e. it is a flattening operation), and only keeps a limited number of open Futures at any given time.
 


>> In general, I felt that the documentation was missing any
>> commentary on concurrency and parallelisation. I was left
>> wondering what thread everything was happening on.
>
> ... as the default all stream processing element is backed by
>  an actor ...

The very fact that each component is backed by an Actor is news
to me. This wasn't at all obvious from the docs and actually
the "integration with actors" section made me think that streams
must be implemented completely differently if it needs an
integration layer!

Internals might or might not be actors. This is all depending on what materializer is used (currently there is only one kind), and even currently we have elements not backed by an Actor (CompletedSource and friends for example). This is completely internal stuff, we don't want to document all aspects of it. But yes, we can add more high-level info about it.
 
Actually, the "actor integration" really
means "low level streaming actors", doesn't it?

I am not sure what this means.
 
I would strongly
recommend making this a lot clearer as it helps to de-mystify the
implementation.

Now knowing that each vertex in the graph is backed by an actor,
I wonder if in "balancing work to fixed pools" the Balance is
just adding a router actor with a balance strategy?

No, it is completely different, routers are not backpressure aware so they cannot be used here. We implemented these elements as actors, but it is not necessary to do so.
 
The
convenience method I suggested above could simply create a router
to multiple instances of the same actor with a parallelism bound.

Well, that is what the recipe does with the Balance and Merge elements.
 
I'm not entirely sure why one would need a Merge strategy for
that, although the option to preserve input order at the output
would be good for some circumstances (which could apply pushback
in preference to growing the queue of out-of-order results).

This is why I kept it as a recipe. If you create a strict round robin Balance and a strict round robin Merge on the other side, the parallelization step will keep the sequence. But you might want to add dropping elements, or batching elements, whatnot. The point of the recipe is that you can tailor it completely to your needs because you see how it works instead of just calling a doMagicForMe() method :)
 

In addition, this revelation helps me to understand the
performance considerations of using akka-streams. Knowing this,
it would only be appropriate for something I would've considered
suitable (from a performance point of view) for hand-crafted flow
control in akka before streams was released. The main advantage
of akka-streams is therefore that it has dramatically lowered the
barrier of entry for writing Actor code with flow control.

Exactly!
 



Thanks for this explanation Endre, I hope to see even more
innovation in the coming releases and improved documentation.

Cheers,
-Endre
 

Best regards,
Sam 
Reply all
Reply to author
Forward
0 new messages