Error handling with Source.queue BroadcastHub

64 views
Skip to first unread message

sub...@gmail.com

unread,
Oct 21, 2017, 9:25:27 AM10/21/17
to Akka User List
Hi,

I'm using a Source.queue with BroadcastHub to implement a pattern where a web request can add an item to the queue, attach to the graph and get a result. The problem I'm stuck on is that I'm not sure how to handle errors without failing the graph. Example:

Without the supervision strategy I see getting the following output

flow handling 1
Result 1 Success(Some(error))
Result 2 Success(None)

The graph stops after the exception and "2" is never processed

With the supervision, the graph recovers and processes "2" but never sees "error"

flow handling 1
Graph failed java.lang.Exception.. resuming
Result 1 Failure(java.util.concurrent.TimeoutException: No elements passed in the last 1 second.)
flow handling 2
Result 2 Success(Some(2))

What I'd like to see is 

Result 1 Success(Some(error))
Result 2 Success(Some(2))

Is there a way I can recover the graph on a failure but also see the value returned from the "recover" function. Also, why does the graph still fail the graph when there is a recover? Thanks
 
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, SourceQueueWithComplete}

import scala.concurrent.Await
import scala.util.Try

object QueueGraph extends App {

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val sourceQueue: Source[String, SourceQueueWithComplete[String]] =
Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)

val decider: Supervision.Decider = {
case e: Exception =>
println(s"Graph failed ${e}.. resuming")
Supervision.Resume
}

val (queue: SourceQueueWithComplete[String], source: Source[String, NotUsed]) =
sourceQueue
.via(Flow[String].map { num =>
println(s"flow handling ${num}")
if (num == "1") {
throw new Exception()
} else {
num
}
})
.recover{case e: Exception => "error"}
// If supervisor is removed, the recover catches the exception and returns "error" but the graph fails and no longer processes anything. With supervisor "error" is never emitted
// .withAttributes(ActorAttributes.supervisionStrategy(decider))
.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()

import scala.concurrent.duration._

// find error
val f = source
.filter(_ == "error")
.idleTimeout(1.second)
.toMat(Sink.headOption)(Keep.right).run()

queue.offer("1")

println(s"Result 1 ${Try(Await.result(f, Duration.Inf))}")

queue.offer("2")

val f2 = source
.filter(_ == "2")
.idleTimeout(1.second)
.toMat(Sink.headOption)(Keep.right).run()

println(s"Result 2 ${Try(Await.result(f2, Duration.Inf))}")
}

Tim Harper

unread,
Oct 23, 2017, 11:46:24 AM10/23/17
to Akka User List
Have you considered wrapping your erroneous request in a Try ?


  val (queue: SourceQueueWithComplete[String], source: Source[String, NotUsed]) =
   
sourceQueue
        .via(Flow[String].map { num =>
         
println(s"flow handling ${num}")
         
if (num == "1") {

           
Failure(new Exception())
         
} else {
           
Success(num)
         
}
       
})
        .map {
         
case Success(v) => v
         
case Failure(ex) => "error"
       
}
        .toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()

Andrew

unread,
Oct 23, 2017, 2:25:49 PM10/23/17
to akka...@googlegroups.com
Perhaps, but I think that would be really tedious to implement in a real-life implementation, because it's not just one aspect of the graph that can fail. I'd have to construct a lot of different failed types to float downstream and handle failures at each junction of the graph. In cases where I'm using third party stream libraries (e.g. alpakka), I would not be able to wrap the code with error handling. I suppose I could use source.recover{...} in these situations but again it seems very tedious. If there is not a clean solution I'd lean towards creating a graph per request but would prefer a shared graph.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Tim Harper

unread,
Oct 25, 2017, 11:26:12 AM10/25/17
to akka...@googlegroups.com
Can you do more to reason about your failures?

I'm not a huge fan of the Go language, but, they avoid throwing exceptions at all costs. There's benefits to be very conservative about throwing exceptions, only throwing them in truly exceptional cases (IE to indicate there's a bug in your code, a thread was interrupted, those kinds of things)

Andrew

unread,
Oct 25, 2017, 12:22:14 PM10/25/17
to akka...@googlegroups.com
I get that position on exceptions. And the purely functional approach would not endorse throwing exceptions, but I'm not going to that extreme and I haven't seen evidence that akka discourages exceptions.

The exceptions I'm concerned with involve transient errors (e.g. rpc failures after retries) and bugs. So, accepting that these are going to happen and would be tedious to handle in all cases, I'm currently abandoning the Source.queue model unless I can find a solution to the error handling. I guess my big question is why doesn't recover work? i.e. recover will catch the first exception, but the graph is unable to process any more elements after that occurs. After an exception, any element added to the queue just replays the exception. This seems wrong.

Tim Harper

unread,
Oct 25, 2017, 4:01:33 PM10/25/17
to akka...@googlegroups.com
Is it an extreme? Go is very much _NOT_ a functional language. I might posit that it is general good programming practice, regardless of your paradigm (imperative / functional).

Also, with regard to your comment that "recover doesn't work", the behavior you describe of the recover operator is exactly the behavior documented. :)

Recover allows to send last element on failure and gracefully complete the stream Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped.
Throwing an exception inside recover _will_ be logged on ERROR level automatically.
Emits when element is available from the upstream or upstream is failed and pf returns an element
Backpressures when downstream backpressures
Completes when upstream completes or upstream failed with exception pf can handle
Cancels when downstream cancels

When a stream stage ("component") fails, it sends the effective "HUP" signal upstream, and the exception downstream. Perhaps this helps explain why the behavior of recover is as it is?

If you abandon using streams, are you missing an opportunity to improve the overall design of your code? Is the pain because "streams are awkward", or, because the existing design is awkward?

Not knowing your code base, I can't say. Good luck with what you decide!

Tim

To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.

Andrew

unread,
Oct 25, 2017, 7:59:39 PM10/25/17
to akka...@googlegroups.com
I was referring to purely functional Scala as extreme (no exceptions). That's just my opinion but I see akka use exceptions all over, so seems reasonable. As I've said before, propagating failure types throughout the graph would be a nightmare, but anyway, I figured out a solution to avoid that.

Good point on recover. What seems like it would be helpful is something like recover, but the graph pulls the next value from the source and keeps going.

I did not mean to say I'm abandoning streams, only abandoning source.queue. I'm 100% invested in streams although sometimes it can be tricky to work with. But I came up with a solution so all good there. Cheers!


To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/v3vFDox-jLg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages