[Akka Streams] Cyclical Flows or Passing Result of Sink to Source

瀏覽次數:196 次
跳到第一則未讀訊息

Oliver Winks

未讀,
2015年4月16日 上午9:55:232015/4/16
收件者:akka...@googlegroups.com
Hi,

I would like to create a simple flow that consists of a Source (ActorPublisher), several Flows and a Sink (ActorSubscriber) like this:

ActorPublisher ~> Flow1 ~> Flow2 ~> .... ~> FlowN ~> ActorSubscriber

However, I would like to connect the Sink (ActorSubscriber) and Source (ActorPublisher) so that the result of the Sink is passed back to the Source and passed through the flow again. Basically I want a recursive or cyclical FlowGraph.

I've tried to do this by sending a message from the Sink to the Source which contains the result from the Sink, this causes the Flow to hang, it looks like deadlock or something. I've also tried to request the previous result from the Sink when a Request is acted upon by the Source: Pseudo code below e.g.

// Source receive block
def receive: Receive = {
 
case Request(_) =>
   
implicit val timeout = Timeout(5 seconds)
    val future
= sink ? "GetPrevResult"

    val prevRes
= Await.result(future, timeout.duration)

    onNext
(prevRes)
}


// Sink receive blocks
def receive: Receive = {
 
case OnNext(val) =>
    val res
= //... do stuff
    context
.become(receiveWithPrev(res))  
}

def receiveWithPrev(prev): Receive = {
 
case OnNext(val) =>
    val res
= //... do stuff
    context
.become(receiveWithPrev(res))

 
case "GetPrevResult" =>
    prev
}


Neither of these approaches work. The code above times out waiting for the previous result. Is there a standard mechanism in Akka Streams for doing this?

Cheers,

Konrad Malawski

未讀,
2015年4月27日 清晨5:30:402015/4/27
收件者:Akka User List
Hi Oliver,
Have you read the section of our docs about cyclic graphs and deadlocks?

Because akka streams (all reactive streams implementations) are purely "demand driven", you have created a cycle which can not "start",
because in order to get moving you need a result from the Sink - however the Sink will never get the first element because the Source is waiting for 
the Sink to emit something (which is waiting for the Source to emit something, which is waiting... – you see where this is headed I hope).

When working with cycles such you'll need to introduce some element that gets the cycle running, like an initial element or a processing stage like expand which can help break this deadlock.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Konrad 'ktoso' Malawski
回覆所有人
回覆作者
轉寄
0 則新訊息