I have properly working graph that has flow with loop. Items go through as expected and everything is working. But unfortunately with bounded source graph never ends. Ever. How can I fix it ?
Here is schema of my flow.
Here is simplified version of the flow with the same topology.
val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val mergeEntrance = builder.add(MergePreferred[Int](1))
val mergePreExit = builder.add(Merge[Int](2))
val bcast1 = builder.add(Broadcast[Int](2))
val bcast2 = builder.add(Broadcast[Int](2))
mergeEntrance.out ~> bcast1.in
bcast1.out(0) ~> Flow[Int].filterNot(_ == 1) ~> mergePreExit.in(0)
bcast2.in <~ Flow[Int].collect {
case v if v == 1 =>
-1
} <~ bcast1.out(1)
mergeEntrance.preferred <~ Flow[Int].filter(_ == 1) <~ bcast2.out(0)
bcast2.out(1) ~> Flow[Int].filterNot(_ == 1) ~> mergePreExit.in(1)
FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})
val completionFut = Source(List(0, 1, 2, 3))
.buffer(4, OverflowStrategy.fail)
.via(badFlow)
.mapAsync(1) {
case v => Future.successful(v * 10)
}
.toMat(Sink.foreach(v => println(s">>> $v")))(Keep.right).run()
for (result <- completionFut)
println("Stream is over!")
I tried to stick ".buffer()" everywhere as suggested in "Graph cycles, liveness and deadlocks" but it didn't help :(
SO question: http://stackoverflow.com/questions/38033362/loop-in-flow-makes-stream-never-end
val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val mergeEntrance = builder.add(MergePreferred[Int](1))
val mergePreExit = builder.add(Merge[Int](2))
val part1 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 1 else 0 ))
val part2 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 0 else 1 ))
mergeEntrance.out ~> part1.in
part1.out(0) ~> mergePreExit.in(0)
part2.in <~ Flow[Int].collect{
case v if v == 1 =>
-1
} <~ part1.out(1)
mergeEntrance.preferred <~ part2.out(0)
part2.out(1) ~> mergePreExit.in(1)
FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})