[Akka-stream] Will Broadcast + merge keep the message order?

525 views
Skip to first unread message

Leon Ma

unread,
Nov 25, 2015, 4:24:27 AM11/25/15
to Akka User List
Hi, 

I have below stream:


broadcast ~> filter1  ~> flowA ~> flowB ~> merge
broadcast ~> filter2 ~> flowC ~> merge

Assuming filter1 and filter2 are exclusive, which means my input element will either go upper flow or go down flow.

How can I guarantee the order?

Say I have a source of X, Y, Z, 

X goes upper flow, Y and Z goes down flow, if flowC runs much faster than flowA + flowB, am I expect to see orders like Y', Z', X' ?


Thanks
Leon





Akka Team

unread,
Nov 25, 2015, 5:50:02 AM11/25/15
to Akka User List
Hi Leon,

No, once you broadcast to two streams, they are concurrent and therefore have no ordering between them. Concurrent processing is the reason to use streams. If you have ordering requirements, then maybe you don't want concurrent processing for that step at all?

I.e. why not just do

src.mapConcat {
  if (..) do someStuff
  else do otherStuff
}

The question that you need to ask yourself is exactly the same as with actors: "do I need a separate actor for this (to exploit parallelism, or for isolation) or I can execute this as part of this actor?"

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Leon Ma

unread,
Nov 26, 2015, 2:51:25 AM11/26/15
to Akka User List
Let me make it a little complex:

Assuming I have a source, for each element A of the source:

if(A.property1 == 1) goes to flow1 + flow2 + flow3
if(A.property1 ==2) goes to  flow4 + flow5
if(A.property1 == 3) goes to flow6 + flow7 + flow8 + flow9
...
...
a lot of other branches



It seems what I want is a black box composite flow that encapsulate all above sub flows and output them in orders.

How should I do branching for above cases?

As you suggested, maybe I should model them as a single flow:

Flow1~> flow2 ~> ... ~> flow9

And in each flow, I do property check to see whether it's acceptable according to A.property1. (if yes, do biz, if no, do nothing and pass on to next flow)

I looks OK, but it introduce some "low level"  logical dependencies, for example maybe I want to reuse a complex composite flow created by others and I can't add checking logic for A.property1.

What I want is actually a "high level" branching "Fan-out" ( like a conditional Balance) + ordered "Fan-in" 



Leon


在 2015年11月25日星期三 UTC-8上午2:50:02,Akka Team写道:

Akka Team

unread,
Nov 26, 2015, 4:31:31 AM11/26/15
to Akka User List
Hi Leon,

On Thu, Nov 26, 2015 at 8:51 AM, Leon Ma <tutu...@gmail.com> wrote:
Let me make it a little complex:

Assuming I have a source, for each element A of the source:

if(A.property1 == 1) goes to flow1 + flow2 + flow3
if(A.property1 ==2) goes to  flow4 + flow5
if(A.property1 == 3) goes to flow6 + flow7 + flow8 + flow9

My question is, if you *don't* want these 3 branches to be concurrent with each other (i.e. you require ordering) then why do you insist on them being different streams? The whole point of streams is concurrent execution, so I think here you might be trying to use the tool for something it is not meant to (using graphs which are inherently concurrent, then trying to force it to stop being concurrent).

Ask yourself: Can I model these branch flows as simple functions on either
 A) returning sequences of strict values
 B) returning futures of sequences of strict values
 C) returning sources?

Then you can model these respectively:

A)
  .mapConcat { elem =>
     if (elem.property1 == 1) function1(elem) // returns Seq[T]
     ...
   }

B)
  .mapAsync{ elem =>
     if (elem.property1 == 1) function1(elem) // returns Future[Seq[T]]
     ...
   }.mapConcat(identity)

C)
  .flatMapConcat{ elem =>
     if (elem.property1 == 1) function1(elem) // returns Source[T]
     ...
  }
 
These three variants are all able to do different processing steps on a single element depending on a property of the element, without making the branches concurrent with each other.

Not everything can, or should be modeled as streams, you can put complicated logic inside stream stages as well.

-Endre

Leon Ma

unread,
Nov 27, 2015, 2:29:27 AM11/27/15
to Akka User List
I got your point, but I'm still worry about the reuse of an exiting composite flow.


Let's say:

John has built a composite flow of flow1 + flow2 + flow3
Mike has built a composite flow of flow4 + flow5
Tom has built a composite flow of flow6 + flow7 + flow8 + flow9

All of above composite flows takes element A as input and output B

Without knowing the implementation details but only the flow contract/spec, I'd like to reuse them in my application:

if(A.property1 == 1) goes to John's flow
if(A.property1 ==2) goes to  Mike's flow
if(A.property1 == 3) goes to Tom's flow


As you suggested, maybe I can do :

  .mapAsync{ elem =>
     if (elem.property1 == 1) {
          Source.single(elem).via(FlowOfMike).runWith(Sink.head)
      }
     ...
   }.mapConcat(identity)


It looks like I can reuse the existing flow, however this flow got materialized every time, right?

I'd like to see whether the existing composite flow can be logically wired in my flow, instead of calling the flow to get some result for each element.


Leon


在 2015年11月26日星期四 UTC-8上午1:31:31,Akka Team写道:

Akka Team

unread,
Nov 27, 2015, 3:28:06 AM11/27/15
to Akka User List
On Fri, Nov 27, 2015 at 8:29 AM, Leon Ma <tutu...@gmail.com> wrote:
I got your point, but I'm still worry about the reuse of an exiting composite flow.


Let's say:

John has built a composite flow of flow1 + flow2 + flow3
Mike has built a composite flow of flow4 + flow5
Tom has built a composite flow of flow6 + flow7 + flow8 + flow9

All of above composite flows takes element A as input and output B

Without knowing the implementation details but only the flow contract/spec, I'd like to reuse them in my application:

if(A.property1 == 1) goes to John's flow
if(A.property1 ==2) goes to  Mike's flow
if(A.property1 == 3) goes to Tom's flow


As you suggested, maybe I can do :

  .mapAsync{ elem =>
     if (elem.property1 == 1) {
          Source.single(elem).via(FlowOfMike).runWith(Sink.head)
      }
     ...
   }.mapConcat(identity)


It looks like I can reuse the existing flow, however this flow got materialized every time, right?

Yes. Whether the cost of that is significant or not depends on your actual processing logic. If it does any amount of significant work, then it might be that you don't need to worry about materialization costs. Just because it has a cost does not mean that it is not worth to pay it.
 

I'd like to see whether the existing composite flow can be logically wired in my flow, instead of calling the flow to get some result for each element.

The main purpose of modeling something as Akka Stream is to have concurrent processing. If you don't need that, go with a simple actor, or just wrap your processing logic in a stream stage (like mapConcat of mapAsync, etc.). I.e. if something has been expressed as a flow, then it is already implying that it is to be used in a concurrent context.

Btw, it is many times possible to expose the same service as functions and flows, for example a map can be exposed as the function itself, and a flow.map wrapping the function. Same for mapConcat and mapAsync. That way the consumer of the library has more freedom to assemble those pieces.

The only way a concurrent branching is possible is to add sequence numbers to your elements. That of course needs all the reusable flows to be able to handle such messages. For example one-to-many stages need to properly handle it. Then you must add a merge that is able to emit them in original order.

Otherwise I don't see any solution, so if you have one please share. Also, please note that this is equivalent to the case of actors. If you have two actors, that handle two different message types coming from a third actor must coordinate to sequentially emit to a fourth actor because Akka does not guarantee message ordering for unrelated senders.

-Endre
Reply all
Reply to author
Forward
0 new messages