Flow supervision decider

100 views
Skip to first unread message

dpratt

unread,
May 15, 2015, 1:50:52 PM5/15/15
to akka...@googlegroups.com
I've been using the Streams API to build a few things for the past couple months, and I have a humble suggestion for an API enhancement. I'm not sure if this is even possible to do given the contract of how a Flow operates, adding a method to FlowOps with the following signature would be quite useful - 

def recover(f: PartialFunction[(In, Throwable), Out]): Repr[Out, Mat]

It's likely due to the fact that I have yet to fully internalize the Flow API, but I've found that the supervision functionality isn't exactly what I need. On the top-level, it makes complete sense, but there is no way to deal with an error in a stream and not have at least one message silently dropped. It would be nice to be able to set up more fine-grained error handling. 

As an example, imagine a stream that was processing incoming deltas to a set of records held either in memory or some persistent data store. A failure of a given delta should not necessarily shut down the whole pipeline, but the associated record should be marked as inconsistent and dealt with appropriately. Using the current supervision API, there's no way to determine the actual element that caused the failure, and thus there's no real way to handle it or signal an external system with the details of the error.

Of course, you can work around this by making the stream operate on a Try[T] instead of T, but that just seems unwieldy. 

Am I looking at this the wrong way?

Patrik Nordwall

unread,
May 20, 2015, 5:37:37 AM5/20/15
to akka...@googlegroups.com
I think we considered adding this to the stream supervision mechanism, but since it is not possible to express the types of the elements there in any sane way we decided to not do it. Instead we said that this specific recover scenario should be handled with try-catch within the function/stage of yours. For mapAsync you can use recover on the Future.

By the way, you can define the supervision for individual stages by using the withAttributes.

Regards,
Patrik

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

dpratt

unread,
May 20, 2015, 12:23:02 PM5/20/15
to akka...@googlegroups.com
What if I have an existing stage/Flow that I do not have control over, or where it would not make sense to conflate the flow logic with the exception handling?

For example

val foo: Flow[String, String, Unit] = SomeLibrary.somethingThatGeneratesAFlow()

how would I wrap foo with error handling? I can't use map or mapAsync, since those are compositional - namely, the value to map has already been calculated. What I really want is a recover block on the flow itself - something like

foo.recover {

dpratt

unread,
May 20, 2015, 12:24:20 PM5/20/15
to akka...@googlegroups.com
Sorry - hit send too soon.

foo.recover {
  case (NonFatal(e), failedValue) =>
     log.error(e, "Problem processing stream value of {}", failedValue)
     "UNKNOWN VALUE"

Patrik Nordwall

unread,
May 20, 2015, 4:07:06 PM5/20/15
to akka...@googlegroups.com
One thing to remember is that an upstream failure will be propagated downstream immediately without backpressure and thereby overtake previously emitted (buffered) elements, and transforming such an error to an element further downstream may result in unexpected order of elements.

Another thing is that such a failure will cancel upstream and that will be difficult to coordinate with a (later) downstream recovery.

It is sure possible to implement for a specific stage, but then it is perhaps confusing that it is only "catching" errors from the preceding stage.

This is just my 2c, so if you want a real assessment you are welcome to create a github issue.

/Patrik

Viktor Klang

unread,
May 20, 2015, 4:14:02 PM5/20/15
to Akka User List
From my PoV:

It is vital to distinguish "stream fatal errors" from "transient element processing error",
the first terminates the stream abruptly, the second should be modeled within the processing domain,
by transmitting things like Try[T] as elements for instance.
Cheers,

David Pratt

unread,
May 20, 2015, 5:08:16 PM5/20/15
to akka...@googlegroups.com
I agree completely - what I guess I'm struggling with (mainly because I haven't yet completely grokked the stream API yet) is how to elegantly compose these abstractions on top of a stream. The nice thing about a try is that while 'inside' it, I don't know or care about exception handling. 

Basically, what I guess I'm looking for is a way to author a Flow[A, B, _], and have a way to lift it into a Flow[A, Try[B], _]. This, of course doesn't work perfectly since there's nothing about a stream that implies a 1:1 correlation between inputs and outputs. Of course, you could always just implement it such that every successful output is lifted into a Success, and any failure drops the current element and emits a  single Failure wrapping the cause. Correlation between input elements and Failure on the output could be implemented by the user by just ensuring that any thrown exceptions have a reference to the relevant data.

Sent from my iPhone
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/6819zf_IV3Q/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Viktor Klang

unread,
May 25, 2015, 7:50:47 AM5/25/15
to Akka User List
On Wed, May 20, 2015 at 11:08 PM, David Pratt <david...@gmail.com> wrote:
I agree completely - what I guess I'm struggling with (mainly because I haven't yet completely grokked the stream API yet) is how to elegantly compose these abstractions on top of a stream. The nice thing about a try is that while 'inside' it, I don't know or care about exception handling. 

I see that as the major problem, it makes developers focus on "happy-path"-programming. I, personally, prefer to have it tracked in the types so that it is impossible to be oblivious to the fact that things can fail :)

This is one of the reasons I like `transform` and `transformWith` for the Future API in Scala 2.12.
 

Basically, what I guess I'm looking for is a way to author a Flow[A, B, _], and have a way to lift it into a Flow[A, Try[B], _]. This, of course doesn't work perfectly since there's nothing about a stream that implies a 1:1 correlation between inputs and outputs. Of course, you could always just implement it such that every successful output is lifted into a Success, and any failure drops the current element and emits a  single Failure wrapping the cause. Correlation between input elements and Failure on the output could be implemented by the user by just ensuring that any thrown exceptions have a reference to the relevant data.

This is rather "impossible" since this means that every stage needs to be possible to hijack to wrap things from T => Try[T], also, which exceptions should be wrapped (i.e. considered fatal, for the stream, and which should't).

Tracking stream element processing failures is not a black-n-white thing for sure, but I tend to error in the direction of explicitness.



--
Cheers,
Reply all
Reply to author
Forward
0 new messages