Having trouble implementing custom stream logic that would perform the following:
val calculationFlows: List[Flow[Int, Int , Unit]] = ... constructed n partials here
val bcast = builder.add(Broadcast[Int](calculationFlows.length + 1))
val custonMerge = builder.add(new CustonMerge[Int, List[Int]])
val zip = ZipWith { (original: Int, calculations: List[Int])
Source ~> bcast ~> zip.in0
calculationFlows.foreach { calculationFlow =>
bcast ~> calculationFlow ~> customMerge.input.inlet
}
custonMerge.out ~> zip.in1
zip.out ~> sink
Having problem writing Custom Merge using FlexiMerge.
The goal is to have all calculations completed and collected into List[Int] and the emitted to zip.in1
Should look like this:
val calculation1 = Flow[Int].map(_+1)
val calculation2 = Flow[Int].map(_+2)
val calculationFlows = calculation1 :: calculation2 :: List.empty
when run with the code above and Source(1 to 5) should produce:
(1 , List(2, 3))
(2 , List(3, 4))
(3 , List(4, 5))
(4 , List(5, 6))
(5 , List(6, 7))
Seems like shouldn't be difficult... tried different FlexiMerge approaches, but no luck.
Can someone help please ?