trait ValveSwitch {
def open: Unit
def close: Unit
}
class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
override val shape = FlowShape(Inlet[A]("
valve.in"), Outlet[A]("valve.out"))
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
}
private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape){
import shape._
var bufferedElement = List.empty[A]
val switch = new ValveSwitch {
override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
bufferedElement.foreach(push(out, _))
bufferedElement = List.empty
}
override def close: Unit = {
mode = ValveMode.Closed
}
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in) //acquires the element that has been received during an onPush
println(s"${mode} on push called with $element")
if (mode == ValveMode.Open) {
push(out, element) //push directly the element on the out port
} else {
bufferedElement = bufferedElement :+ element
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
println("on pull called")
pull(in) //request the next element on in port
}
})
}
}
trait ValveMode
object ValveMode {
case object Open extends ValveMode
case object Closed extends ValveMode
}