Loop in flow makes stream never end

22 views
Skip to first unread message

Jack Daniels

unread,
Jun 25, 2016, 7:38:22 PM6/25/16
to Akka User List

I have properly working graph that has flow with loop. Items go through as expected and everything is working. But unfortunately with bounded source graph never ends. Ever. How can I fix it ?


Here is schema of my flow.


enter image description here


Here is simplified version of the flow with the same topology.


val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val mergeEntrance = builder.add(MergePreferred[Int](1))
  val mergePreExit = builder.add(Merge[Int](2))
  val bcast1 = builder.add(Broadcast[Int](2))
  val bcast2 = builder.add(Broadcast[Int](2))

  mergeEntrance.out ~> bcast1.in

  bcast1.out(0) ~> Flow[Int].filterNot(_ == 1) ~> mergePreExit.in(0)

  bcast2.in <~ Flow[Int].collect {
    case v if v == 1 =>
      -1
  } <~ bcast1.out(1)

  mergeEntrance.preferred <~ Flow[Int].filter(_ == 1) <~ bcast2.out(0)

  bcast2.out(1) ~> Flow[Int].filterNot(_ == 1) ~> mergePreExit.in(1)

  FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})

val completionFut = Source(List(0, 1, 2, 3))
  .buffer(4, OverflowStrategy.fail)
  .via(badFlow)
  .mapAsync(1) {
    case v => Future.successful(v * 10)
  }
  .toMat(Sink.foreach(v => println(s">>> $v")))(Keep.right).run()

for (result <- completionFut)
  println("Stream is over!")


I tried to stick ".buffer()" everywhere as suggested in "Graph cycles, liveness and deadlocks" but it didn't help :(


SO question: http://stackoverflow.com/questions/38033362/loop-in-flow-makes-stream-never-end

Jack Daniels

unread,
Jun 25, 2016, 7:55:32 PM6/25/16
to Akka User List
I didn't know about existence of Partition fan-out since it's not in the documentation but I just saw in akka sources. Thus here is shorter version of "bad" flow

val badFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val mergeEntrance = builder.add(MergePreferred[Int](1))
  val mergePreExit = builder.add(Merge[Int](2))


  val part1 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 1 else 0 ))
  val part2 = builder.add(Partition[Int](2, (i: Int) => if (i == 1) 0 else 1 ))

  mergeEntrance.out ~> part1.in
  part1.out(0) ~> mergePreExit.in(0)
  part2.in <~ Flow[Int].collect{

    case v if v == 1 =>
      -1

  } <~ part1.out(1)

  mergeEntrance.preferred <~ part2.out(0)
  part2.out(1) ~> mergePreExit.in(1)

  FlowShape.of(mergeEntrance.in(0), mergePreExit.out)
})

Akka Team

unread,
Jul 13, 2016, 4:44:40 AM7/13/16
to Akka User List
MergePreferred will not complete until all its ports has completed. You can make it complete "eagerly" - as soon as one of the inputs complete like this:

MergePreferred[Int](1, eagerComplete = true))

--
Johan
Reply all
Reply to author
Forward
0 new messages