Error handling in Streams

305 views
Skip to first unread message

Gesly George

unread,
Jul 13, 2015, 1:01:04 AM7/13/15
to reactor-...@googlegroups.com
I'm a bit confused about error handling in streams. How can I handle errors in a stream such that the publisher is not impacted and I can continue to get the next event in the stream. I've tried a few combinations with retry() & onError() but did not get the expected results.
        RingBufferProcessor<Integer> processor = RingBufferProcessor.create();
       
Stream<Integer> intStream = Streams.wrap(processor);
        intStream
.map(i -> {
                   
if (i == 5) throw new RuntimeException("We don't like this guy!!");
                   
return i;})
               
.retry()
               
.consume(i -> LOG.info("Val => " + i), e -> LOG.info("Error => \n", e));
       
for (int i = 0; i < 10; i++) {
            processor
.onNext(i);
       
}

The above code produces an infinite loop of (0, 1, 2, 4, 0, 1, 2 ...)

Without the retry you get, (0, 1, 2, 3, 4) and the exception propagates to consume's error handler.

How do I process the exception (lets say simply LOG the exception) and continue receiving events 5-10? What does onErrorResumeNext() do - is that to switch to a fallback publisher in my example above? Are these some usage examples for this in the test code?

Thanks,
Gesly


Stephane Maldini

unread,
Jul 15, 2015, 5:01:50 PM7/15/15
to Gesly George, reactor-framework
Hey Gesly,

Sorry for the delay, I have some engagements, and I should clear up my time for Reactor hopefully in a couple weeks.

retry() means infinite retry, until complete where a retry is a simply an unsubscribe+subscribe dance when onError is called. In your case you never complete.

Without the retry, exception correctly propagates as expected and will be the last event of the sequence.

Now retry comes in multiple favors, retryWhen(Function<Stream<Throwable>, Publisher<>) being the most interesting. You can find an example (and a few more) in https://github.com/reactor/reactor/blob/master/reactor-stream/src/test/groovy/reactor/rx/StreamsSpec.groovy#L2750.

The passed stream is the sequence of errors observed during the entire life over multiple unsubscribe/subscribe dances. Returning a publisher allows to interact with this dance, if next or complete is detected, we retry, if error is detected we don't and we propagate the error to the downstream chain.

onErrorResumeNext is another way to say onError-Fallback-To. You can find an example here https://github.com/reactor/reactor/blob/master/reactor-stream/src/test/groovy/reactor/rx/StreamsSpec.groovy#L2257.
The passed Publisher argument will be subscribed to when an error is produced, instead of just letting the error propagate.


--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Solutions Architect, CSO EMEA | London | Pivotal

Gesly George

unread,
Jul 17, 2015, 9:42:33 PM7/17/15
to reactor-...@googlegroups.com, gesly....@gmail.com
Stephane,
I don't think any of these approaches will let me just record the error and continue to receive the next event from the publisher. In my example, I would like to capture the exception thrown when mapping event 5 (lets say simple log it) and then continue to receive events 6, 7, 8, 9. Is there a way to do that?

Gesly

Gesly George

unread,
Aug 19, 2015, 1:53:39 AM8/19/15
to reactor-framework, gesly....@gmail.com
Stephane,

Ideas??

Stephane Maldini

unread,
Aug 19, 2015, 7:14:05 AM8/19/15
to Gesly George, reactor-framework
My bet is still on retryWhen(), checklist :
  • You can choose to continue receiving events (by resubscribing to processor internally)
  • You can let the error propagate (if deemed fatal)
  • You can count the number of errors if wanted and expose that or choose to propagate after a threshold
Typically because the error happens inside the map (after processor), the processor is still considered healthy, so retryWhen will resubscribe map() to it. If you were signaling the processor while it was re-subscribing, the signals would sit in the processor until the new subscriber arrive and you should see 6..7..8 then. If not let me know.

An example of retryWhen :

 RingBufferProcessor<Integer> processor = RingBufferProcessor.create();
        
Stream<Integer> intStream = Streams.wrap(processor);
        intStream
.map(-> {
                    
if (== 5) throw new RuntimeException("We don't like this guy!!");
                    
return i;})

                
.retryWhen( errors -> errors.observe(error-> { if(error instanceof SomeFatalException) throw error })
Stephane Maldini | Reactor Project Lead, Spring Engineering | London | Pivotal

Laurent Quérel

unread,
Jan 26, 2016, 8:01:51 PM1/26/16
to reactor-framework, gesly....@gmail.com
Hi Stephane,

I tried our solution but that doesn't work with reactor 2.0.7 or I missed a step. 

The sequence displayed is still 0,1,2,3,4. 

To continue the processing of the sequence I can manually catch all exceptions in every step of the pipeline (map, filter, ...) but could be nice to provide a more general/nice mechanism to handle recoverable exceptions gracefully.

Laurent Querel

Victor Ferrer

unread,
Mar 31, 2016, 10:59:57 AM3/31/16
to reactor-framework, gesly....@gmail.com
I had the same problem (on 2.5 though) and posted this SO question:

It works (handles discrete errors) but the code is ugly as hell.
Reply all
Reply to author
Forward
0 new messages