Hi,
I would like to create a simple flow that consists of a Source (ActorPublisher), several Flows and a Sink (ActorSubscriber) like this:
ActorPublisher ~> Flow1 ~> Flow2 ~> .... ~> FlowN ~> ActorSubscriber
However, I would like to connect the Sink (ActorSubscriber) and Source (ActorPublisher) so that the result of the Sink is passed back to the Source and passed through the flow again. Basically I want a recursive or cyclical FlowGraph.
I've tried to do this by sending a message from the Sink to the Source which contains the result from the Sink, this causes the Flow to hang, it looks like deadlock or something. I've also tried to request the previous result from the Sink when a Request is acted upon by the Source: Pseudo code below e.g.
// Source receive block
def receive: Receive = {
case Request(_) =>
implicit val timeout = Timeout(5 seconds)
val future = sink ? "GetPrevResult"
val prevRes = Await.result(future, timeout.duration)
onNext(prevRes)
}
// Sink receive blocks
def receive: Receive = {
case OnNext(val) =>
val res = //... do stuff
context.become(receiveWithPrev(res))
}
def receiveWithPrev(prev): Receive = {
case OnNext(val) =>
val res = //... do stuff
context.become(receiveWithPrev(res))
case "GetPrevResult" =>
prev
}
Neither of these approaches work. The code above times out waiting for the previous result. Is there a standard mechanism in Akka Streams for doing this?
Cheers,