Akka Streams Feedback Merge never completes

494 views
Skip to first unread message

Jan Liße

unread,
May 18, 2015, 6:29:06 AM5/18/15
to akka...@googlegroups.com
Hello,

i'm currently building a scraper system on top of Akka Streams. I have written a Flow that is able to follow paginated sites and scrape them in a loop.
For this i use a feedback merge. 



scrapePaginated takes a function that decides if there are further pages to scrape. If there are, it returns as part of the response tuple a Some() with the next url.
And of course a None for the last page.
The iteration and the feedback loop works and all pages are scraped properly. But even when all URL's are processed the stream never completes. OnComplete never gets invoked.
Is this an expected behaviour? Or is there an error in my scrapePaginated method? I read the doc's chapter on graph deadlocks and liveness issues and finally added a buffer step with OverflowStrategy.Fail to the feedback loop but to no avail.
If it helps to clarify the problem i can provide a simple Spec that reproduces the issue.

Thanks in advance for any help!

Jan  


Endre Varga

unread,
May 18, 2015, 6:42:05 AM5/18/15
to akka...@googlegroups.com
I might be wrong here, but it seems like:

 - merge does not stop, because the feedback loop does not stop
 - the feedback loop does not stop, because unzip does not stop
 - unzip does not stop, because merge does not stop.

If I am correct, then this is an interesting twist on the deadlock scenarios. This does not deadlock on elements/backpressure but it deadlocks on completion signal. 

Please open a ticket for discussion, I am not sure how to solve this in a generic fashion, but the collection of deadlock scenarios is growing and we need to provide an answer eventually and I want this one documented in a ticket, too.

-Endre
 

Thanks in advance for any help!

Jan  


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Jan Liße

unread,
May 18, 2015, 10:46:55 AM5/18/15
to akka...@googlegroups.com
Hallo Endre,

thanks for your quick response. I have filed an issue here: https://github.com/akka/akka/issues/17507

Best regards,
Jan

Jean-Pierre Thomasset

unread,
Nov 5, 2015, 7:48:31 AM11/5/15
to Akka User List
Hi Jan,

Sorry for the necrobump but I ended up in a similar situation and I was wondering what was your final implementation to circumvent this deadlock on completion ? Did you go for the "Poison Pill" approach suggested on github ?

Best regards,
Jean-Pierre.

Jean-Pierre Thomasset

unread,
Nov 27, 2015, 11:31:34 AM11/27/15
to Akka User List
Hi again,

I tried the "Poison Pill" solution but it looks kind of ugly so maybe I understood it wrong. 

First, some background on what I am doing which seems close to Jan project. I am crawling some web pages extracting data and following url when encoutering pagination.

Here is a simplified version of the graph I made:

                                         broadcast ~> sink
source
~> merge           ~> fetchUrl ~> broadcast ~> extractNextUrl
          merge
.preferred             <~              extractNextUrl

 - `source` is a source of initial urls.
 - `fetchUrl` is responsible of fetching the received url and send the url content.
 - `extractNextUrl` is responsible of extracting a possible 'next-page' url when found in the downloaded content.

The graph works but the problem here is that the stream never completes which is expected as the merge stage is pulling from himself. 
Using the eagerClose option on the Merge operation is not applicable here as it would end up missing all url from the feedback loop upon source completion.

I tried using the Poison Pill using an Either[T, PoisonPill] as the data transiting in the graph but It's kind of messy and would force me to handle the PoisonPill case at each stage. Moreover one of the stage I am using is a Http.superPool[T]() which only accepts a tupple of (HttpRequest, T) so I'm stuck.

Now I'm thinking of creating a custom async stage that will wrap this logic using a Future based HTTP request, but I would prefer sticking to a stream based approach. If someone has some ideas on how to achieve that, I will be very happy.

Best Regards,
JP.

David Knapp

unread,
Nov 30, 2015, 8:02:42 PM11/30/15
to Akka User List
I'm actually in pretty much the exact same position you're in, except instead of crawling for new URLs, I'm filtering back unfinished responses into my stream so I can wait until they're done. This is my graph


source ~> sync ~> merge.preferred
                                           merge
~> broadcast
                                                    broadcast
.out(0) ~> finishedFilter
                                                    broadcast
.out(1) ~> unfinishedFilter
          merge
<~ syncStatusFlow <~ Limiter.limit[Long](Client.limiter, 10 seconds) <~ Flow[SyncStatus].map(_.account) <~ unfinishedFilter


(finishedFilter.outlet)

Endre Varga

unread,
Dec 1, 2015, 3:29:01 AM12/1/15
to akka...@googlegroups.com
Hi,

In both cases, you have to know what is your terminating condition at the merge site. Then you can use a custom GraphStage to implement the merge and have the right termination logic: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html#custom-processing-with-graphstage

I don't yet see how to provide such a built-in stage because the termination condition can be very different in different use-cases. Will think about it though, but it would be nice if you could contribute a GraphStage that at least solved your problems.

-Endre

Jean-Pierre Thomasset

unread,
Dec 1, 2015, 6:14:16 AM12/1/15
to Akka User List
Hi Endre,

Thank you for your feedback. The problem is the termination condition is dependent on the feedback loop being dried-out. Basically, if this stage had access to all stages between it's output port and the feedback input port, I could check that
  • The 'external' input port of the merge operation is CLOSED_EMPTY
  • The 'feedback' input port of the merge operation is PULLED
  • The output port of the merge operation is PULLED
  • All input & output ports between the output port and the 'feedback' input port of the merge operation are PULLED
However there are at least two issues with this process : 
  • The merge stage is not aware of the state of all the stages ports (at least, I did not found how ;)) and even if it had access, it would be risky to access the state of another stage running in another actor.
  • A stage pulling from its input port before pushing its current payload (like a buffer or an asynchronous stage able to perform concurrent operation) would defeat the detection.
If we don't care of the second issue, a solution would be to have a probe at each edge of the graph, maybe using a custom connector instead of '~>'.

Jean-Pierre.

Endre Varga

unread,
Dec 1, 2015, 6:23:53 AM12/1/15
to akka...@googlegroups.com
Hi,

In this case I don't see the reason of modeling the loop as a stream. By making it a stream you make all this state dispersed among concurrent entities, but in reality this is all state that needs to be at one place. If this is the case then this is clearly not a good fit for streams. You can make this complex processing part of a custom stage, or you can create an actor for this and tie to your other streaming parts. 

-Endre

Jean-Pierre Thomasset

unread,
Dec 1, 2015, 7:08:29 AM12/1/15
to Akka User List
Thank you for the message,

I thought of that but as the stream approach worked really well when the flow is uninterrupted, I tried to find a solution to the completion problem. I think there are multiple case of feedback loop completion deadlock and I was hoping there was a nice solution.

One solution would be to have some kind of out of bound message passing through the graph or some kind of tryClose/tryFinish operation that would fail on the first stage still processing data. That would allow to send the operation or message in the feedback loop and wait until it reaches back the merge operation. This is something I tried to implement using an Either[T, PoisonPill] flowing in the graph but it requires each stage to handle this type which can be a pain.

Regards,
Jean-Pierre.

David Knapp

unread,
Dec 2, 2015, 2:58:38 PM12/2/15
to Akka User List
I agree with your proposed solution. Maybe a Source that emits a "this is the last element" element, but something that the stage can implicitly handle or something. 

Jean-Pierre Thomasset

unread,
Dec 3, 2015, 6:10:27 AM12/3/15
to Akka User List
For my problem, I finally found a solution but that may not work in all cases. The solution is possible because I know that an object flowing in the feedback loop cannot be multiplied (in my case there could be at max one 'next url' in the page currently fetched. So changing the extractNextUrl to output Option[T] instead of T, I can track at the merge site if there is another page to crawl, thus deciding if I need to finish this stage.

                                         broadcast ~> sink
source
~> merge           ~> fetchUrl ~> broadcast ~> extractNextUrl
          merge
.preferred             <~              extractNextUrl

Here is what I want to do : 
- The extractNextUrl will send Some(t) when a next url is found, and None when there is none.
- The 'merge' stage will count outstanding data in the feedback loop. i.e, the difference between the output count and the merge.preferred input count (including None). 
- If the first input port of the merge operation is closed and upon reception of the last None from extractNextUrl and if there are no outstanding request then finish this stage.

I'll try to implement this solution in a custom merge stage this weekend and I'll let you know if it works.

Regards,
Jean-Pierre.

Jean-Pierre Thomasset

unread,
Dec 7, 2015, 9:06:19 AM12/7/15
to Akka User List
Hello,

Here is the implementation of a custom feedback stage. It's composed of three port : 
- out [T]: The output of the merge operation made by this stage
- in [T] : The main input port (outside the feedback loop)
- feedback [Option[T]] : the feedback loop input port

Keep in mind that it can only works if the number of element flowing out is exactly the number flowing in back in the feedback port. The stage will be completed once the main input port (in) is completed and that there are no outstanding request flowing in the feedback loop (the last element in this loop should be a None to complete the stage).

  class FeedbackShape[T](val _init: FanInShape.Init[T]) extends FanInShape[T](_init) {
    def this() = this(FanInShape.Name[T]("FeedbackShape"))

    override protected def construct(init: Init[T]): FanInShape[T] = new FeedbackShape(init)
    override def deepCopy(): FeedbackShape[T] = super.deepCopy().asInstanceOf[FeedbackShape[T]]


    val in = newInlet[T]("in")
    val feedback = newInlet[Option[T]]("feedback")
  }

  class Feedback[T] extends GraphStage[FeedbackShape[T]] {
    // Define the shape of this stage, which is SourceShape with the port we defined above
    override val shape: FeedbackShape[T] = new FeedbackShape


    def in: Inlet[T] = shape.in
    def out: Outlet[T] = shape.out
    def feedback: Inlet[Option[T]] = shape.feedback


    // This is where the actual (possibly stateful) logic will live
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      var flowingCount = 0

      def checkComplete(): Unit = {
        // Complete this stage if there are no data in the feedback loop
        if(flowingCount == 0 && isClosed(in)) {
          completeStage()
        }
      }

      def pullSecondary(): Unit = tryPull(in)

      /**
       * Output port handler
       */
      setHandler(out, new OutHandler {
        private var first = true
        override def onPull(): Unit = {
          if (first) {
            first = false
            tryPull(feedback)
            tryPull(in)
          }
        }
      })

      var feedbackEmitting = 0

      /**
       * Prefered input port handler
       */
      setHandler(feedback, new InHandler {
        override def onUpstreamFinish(): Unit = checkComplete()
        override def onPush(): Unit = {
          /* Received back the element in the feedback loop */
          flowingCount -= 1
          emitFeedback()
        }

        def emitFeedback(): Unit = {
          feedbackEmitting += 1

          grab(feedback) match {
            case Some(e) => {
              flowingCount += 1
              emit(out, e, emitted)
            }
            case None => checkComplete()
          }
          tryPull(feedback)
        }

        val emitted = () ⇒ {
          feedbackEmitting -= 1
          if (isAvailable(feedback)) emitFeedback()
          else if (feedbackEmitting == 0) emitSecondary()
        }

        def emitSecondary(): Unit = {
          if (isAvailable(in)) {
            flowingCount += 1
            emit(out, grab(in), pullSecondary)
          }
        }

      })

      /**
       * Secondary input port handler
       */
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          if (feedbackEmitting == 0) {
            flowingCount += 1
            emit(out, grab(in), pullSecondary)
          }
        }
        override def onUpstreamFinish(): Unit = checkComplete()
      })


    }
  }

It can be used in a graph of this shape : 
source ~> feedback.in
          feedback.out                ~>          broadcast ~> output/sink
          feedback.feedback   <~ loopOperation <~ broadcast

where loopOperation must check if it needs to send back the data to the loop by sending either
- Some(t) : if the loop shall continue
- None : if the loop must end for the given input

The feedback stage will be responsible of unboxing the Option[T] sent to it's feedback port and will send it as a T to the out port.

I hope this will help you.

Best Regards,
JP.

David Knapp

unread,
Dec 18, 2015, 5:29:26 PM12/18/15
to Akka User List
I think your emitFeedback() method should look like this:

            def emitFeedback(): Unit = {


                grab
(feedback) match {
                   
case Some(e) => {

                        feedbackEmitting
+= 1

                        flowingCount
+= 1
                        emit
(out, e, emitted)
                   
}
                   
case None => checkComplete()
               
}
                tryPull
(feedback)
           
}

otherwise it will keep accumulating feedbackEmitting and lock the upstream as soon as it gets its first None in the feedback.

Theodore Widom

unread,
Dec 6, 2017, 9:59:46 AM12/6/17
to Akka User List
I had the same type of use case recently and I think there might be some other potential strategies for completing feedback loops.

Here are some rough pseudocode sketches:

CASE # 1: An Option[T] going back into the loop:
   - takeWhile Option[T].nonEmpty
   - mapConcat to transform Option[T] into T because the merge takes T, not Option[T]

val takeWhile = Flow[Option[T]].takeWhile(_.nonEmpty)
val mapConcat
= Flow[Option[T]].mapConcat(_.toList)

                                broadcast
~> sink
source
~> merge ~> fetchUrl ~>  broadcast ~> extractNextUrlOpt
          merge
<~ mapConcat <~ takeWhile <~ extractNextUrlOpt


CASE # 2: A varying number of T's going back into the loop (instead of an Option[T] we are dealing with a List[T]):
   - a statefulMapConcat from List[T] to a tuple (Long, List[T]) where `Long` is a stateful counter tracking the number of outstanding tasks
   - takeWhile counter != 0
   - mapConcat to turn List[T] to individual Ts, because the merge takes T, not List[T]

val counter = Flow[List[T]].statefulMapConcat { () =>
 
var counter = sourceSize // Initialize the stateful counter to the # of tasks we start out with in our Source
  newTasks
=> {
    counter
-= 1 // Each time we get a list of new tasks, we know that exactly one old task is done
    counter
+= newTasks.size // And exactly the # of tasks in the list has been added
   
(counter, newTasks)
 
}
}
val takeWhile
= Flow[(Long, List[T])].takeWhile { case (counter, _) => counter != 0 }
val mapConcat
= Flow[(Long, List[T])].mapConcat { case (_, newTasks) => newTasks }

                                        broadcast  
~>  sink
source
~> merge   ~>   fetchUrl   ~>    broadcast   ~>  extractNextUrlList
          merge
<~ mapConcat <~ takeWhile <~ counter <~ extractNextUrlList

(In practice you might want something other than a List for better time complexity of the `size` method, etc.; I'm using a List here just to keep things simple).


CASE # 3: Sometimes a feedback loop arises because a node in the graph holds state that needs to be able to handle messages from multiple other places in the graph, which causes a feedback loop. In this scenario, a feedback loop can sometimes be "straightened out" by holding the state in a separate actor and by using the mapAsync flow + ask pattern from multiple places in a "straight line" graph that no longer contains a feedback loop.
Reply all
Reply to author
Forward
0 new messages