HELP !! akka streams FlexiMerge problem

20 views
Skip to first unread message

Maxim Matvienko

unread,
Oct 2, 2015, 3:35:02 AM10/2/15
to Akka User List
Having trouble implementing custom stream logic that would perform the following:




val calculationFlows: List[Flow[Int, Int , Unit]] = ... constructed n partials here


val bcast = builder.add(Broadcast[Int](calculationFlows.length + 1))
val custonMerge = builder.add(new CustonMerge[Int, List[Int]])

val zip = ZipWith { (original: Int, calculations: List[Int])

Source ~> bcast ~> zip.in0

calculationFlows.foreach { calculationFlow =>
bcast ~> calculationFlow ~> customMerge.input.inlet
}
custonMerge.out ~> zip.in1

zip.out ~> sink


Having problem writing Custom Merge using FlexiMerge.
The goal is to have all calculations completed and collected into List[Int] and the emitted to zip.in1

Should look like this:

val calculation1 = Flow[Int].map(_+1)
val calculation2 = Flow[Int].map(_+2)

val calculationFlows = calculation1 :: calculation2 :: List.empty

when run with the code above and Source(1 to 5) should produce:

(1 , List(2, 3))
(2 , List(3, 4))
(3 , List(4, 5))
(4 , List(5, 6))
(5 , List(6, 7))
 

Seems like shouldn't be difficult... tried different FlexiMerge approaches, but no luck.

Can someone help please ?

Akka Team

unread,
Oct 2, 2015, 7:49:36 AM10/2/15
to Akka User List
Hi Maxim,

As far as I see this is a variation on the built in ZipWithN, the difference is that the arity of the inputs is only-known at run-time, but the types of the inputs are the same (unlike most zips). AFAIK this is easily doable as a FlexiMerge.

Here is an example that creates a conventional Zip using Fleximerge: https://github.com/akka/akka/blob/release-2.3-dev/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiMergeSpec.scala#L85

The difference in your case will be that you should use a UniformFanInShape (that can have arbitrary many input ports, all of the same type) and the number of states will be N:
 - wait for 1st input
 - wait for 2nd input
 ...
 - wait for Nth input

Of course this will only need a properly parametrized state that encodes which port it is waiting on (i.e. one State subclass with an Int parameter).

Please be aware though that FlexiMerge and FlexiRoute are being phased out for a simpler and yet more powerful construct that will be available in the next 1.1 release.

-Endre


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Maxim Matvienko

unread,
Oct 2, 2015, 9:09:40 AM10/2/15
to Akka User List
Thanks Endre - great and timely answer.

Now got me curious about 1.1 release... Any idea on dates for it ?
Reply all
Reply to author
Forward
0 new messages