Akka stream - Flow in Pause

154 views
Skip to first unread message

regis leray

unread,
Oct 13, 2016, 12:52:58 PM10/13/16
to Akka User List
Hi, 

I'm trying to implements a way to control a flow (start/stop), nothing was implemented yet in the core of akka-stream

My current implementation looks like this.

trait ValveSwitch {
  def open: Unit
  def close: Unit
}

class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

  override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = {
    val logic = new ValveGraphStageLogic(shape, mode)
    (logic, logic.switch)
  }

  private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape){
    import shape._

    var bufferedElement = List.empty[A]

    val switch = new ValveSwitch {
      override def open: Unit = {
        mode = ValveMode.Open
        println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
        bufferedElement.foreach(push(out, _))
        bufferedElement = List.empty
      }

      override def close: Unit = {
        mode = ValveMode.Closed
      }
    }

    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val element = grab(in) //acquires the element that has been received during an onPush
        println(s"${mode} on push called with $element")
        if (mode == ValveMode.Open) {
          push(out, element) //push directly the element on the out port
        } else {
          bufferedElement = bufferedElement :+ element
        }
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        println("on pull called")
        pull(in) //request the next element on in port
      }
    })
  }
}

trait ValveMode

object ValveMode {
  case object Open extends ValveMode
  case object Closed extends ValveMode
}

====

My current unit test is failing. due to the fact when i open the valve, i never received the previous message.
It seems even if i push the element through ( valve.open ) the sink never receive the element

class ValveSpec extends FlatSpec {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = materializer.executionContext


  "A closed valve" should "emit only 3 elements after it has been open" in {
    val (valve, probe) = Source(1 to 3)
      .viaMat(new Valve(ValveMode.Closed))(Keep.right)
      .toMat(TestSink.probe[Int])(Keep.both)
      .run()

    probe.request(1)
    probe.expectNoMsg()

    valve.open
    probe.expectNext(1)

    probe.request(2)
    probe.expectNext(2, 3)

    probe.expectComplete()
  }
}


Here the gist 

regis leray

unread,
Oct 14, 2016, 3:02:59 AM10/14/16
to Akka User List
I'm trying to implement the pause/resume in akka stream.
The current implementation, is buffering all elements into an List if the valve is in mode "close", if not we are pushing elements like the default behavior.

When we are resuming the flow by calling the materialize value switch.open, we change the mode to open and pushing all buffered elements.
Currently im suspecting this code to be the problem

val switch = new ValveSwitch {
override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

bufferedElement.foreach(push(out, _))
    bufferedElement = Option.empty
}
...
}

In my unit test when the switch is changing close to open, the Sink never receive the elements

"A closed valve" should "emit only 3 elements after it has been open" in {
  val (valve, probe) = Source(1 to 5)

.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()

  probe.request(2)
probe.expectNoMsg()

valve.open // we are pushing the buffered elements
probe.expectNext shouldEqual 1 // this assert is failing !
probe.expectNext shouldEqual 2
}

Any help would be really appreciated

Konrad 'ktoso' Malawski

unread,
Oct 14, 2016, 4:10:16 AM10/14/16
to Akka User List
Seems like double posted the same question?
Please don't double post the same question :-)


-- 
Konrad
Akka Team
Reply all
Reply to author
Forward
0 new messages