expected behaviour of exceptions thrown from doOnNext handlers

1,869 views
Skip to first unread message

Jan Bols

unread,
Mar 1, 2016, 7:32:20 AM3/1/16
to RxJava
Hi,
what would be the expected behaviour when there's a doOnNext handler that throws an exception. Any exception. 
Could I expect rx to catch the exception and call onError? If not, could I assume that this is a bug?

I was looking at the rx guidelines, but I couldn't see anything about catching errors from side effects.


Best regards
Jan

Dávid Karnok

unread,
Mar 1, 2016, 7:36:29 AM3/1/16
to Jan Bols, RxJava

Jan Bols

unread,
Mar 1, 2016, 7:47:05 AM3/1/16
to RxJava, guter...@gmail.com
Hi David, thanks for the reply.

The problem is, when my doOnNext throws an OnErrorNotImplementedException, the exception is swallowed and the onError handler is not called.

I created some spock tests that demonstrate this:

import rx.Observable
import rx.exceptions.OnErrorNotImplementedException
import rx.functions.Actions
import spock.lang.FailsWith
import spock.lang.Specification

class RxTest extends Specification {


    def "when a doOnNext throws an IllegalStateException, it's propagated to the executing thread as an OnErrorNotImplementedException when not specifying an onError"() {
        given:
        boolean onErrorCalledAsSideEffect = false
        when:
        Observable.just(1, 2, 3)
                .doOnNext({ throw new IllegalStateException("I'm behaving badly") })
                .doOnError({ onErrorCalledAsSideEffect = true })
                .subscribe()
        then:
        thrown(OnErrorNotImplementedException)
        onErrorCalledAsSideEffect == true
    }

    def "when a doOnNext throws an IllegalStateException, it's handled by the onError handler"() {
        given:
        boolean onErrorCalledFromSubscribe = false
        when:
        Observable.just(1, 2, 3)
                .doOnNext({ throw new IllegalStateException("I'm behaving badly") })
                .subscribe(Actions.empty(), { onErrorCalledFromSubscribe = true })
        then:
        noExceptionThrown()
        onErrorCalledFromSubscribe == true
    }


    @FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorNotImplementedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers")
    def "when a doOnNext throws an OnErrorNotImplementedException, it's NOT propagated to the executing thread when not specifying an onError"() {
        given:
        boolean onErrorCalledAsSideEffect = false
        when:
        Observable.just(1, 2, 3)
                .doOnNext({ throw new OnErrorNotImplementedException(new IllegalStateException("I'm behaving badly")) })
                .doOnError({ onErrorCalledAsSideEffect = true })
                .subscribe()
        then:
        thrown(OnErrorNotImplementedException)
        onErrorCalledAsSideEffect == true      //THIS ASSERTION FAILS
    }

    @FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorNotImplementedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers")
    def "when a doOnNext throws an OnErrorNotImplementedException, the onError handler is not called"() {
        given:
        boolean onErrorCalledFromSubscribe = false
        when:
        Observable.just(1, 2, 3)
                .doOnNext({ throw new OnErrorNotImplementedException(new IllegalStateException("I'm behaving badly")) })
                .subscribe(Actions.empty(), { onErrorCalledFromSubscribe = true })
        then:
        thrown(OnErrorNotImplementedException)
        onErrorCalledFromSubscribe == true     //THIS ASSERTION FAILS
    }
}


The first 2 succeed, but the moment the on next side effect throws an OnErrorNotImplemented or even an OnErrorFailedException, I guess rx thinks it already handled the error.

This doesn't seem correct to me.

Best regards
Jan

Op dinsdag 1 maart 2016 13:36:29 UTC+1 schreef Dávid Karnok:

Dávid Karnok

unread,
Mar 1, 2016, 8:23:20 AM3/1/16
to Jan Bols, RxJava
The problem I see is you use the paremeterless subscribe() method which has that default behavior of throwing OnErrorNotImplementedException, which is considered a fatal exception that has nowhere to go. RxJava chose to throw it upwards which may end up on some thread's call chain but the original IllegalStateException is not considered fatal and can travel downstream - but that requires an onError handler. Therefore, if you throw an OnErrorNotImplementedException manually, RxJava will think its one of his own fatal errors and throws them upwards, not calling any downstream onError at all.

Jan Bols

unread,
Mar 1, 2016, 10:46:26 AM3/1/16
to RxJava, guter...@gmail.com
Hi David, thank you for your reply.

The problem is, my doOnNext handler calls another observable that might fail. And when it fails, it will throw an OnErrorNotImplementedException when I use a parameterless subscribe, or an OnErrorFailedException when I rethrow the error in my inner error handler.

Below you can find 2 more tests. 
1. The first one creates an observable that calls another observable as a side effect. That other observable fails and the outer error handler is not called.
2. The second one does the same thing but uses an inner observable with an error handler that wraps the error in another IllegalStateException.

Both fail to call the outer error handler.

    @FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorNotImplementedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers")
    def "when a doOnNext calls a failing observable subscription, the onError handler is not called"() {
        given:
        boolean onErrorCalledFromSubscribe = false
        when:
        Observable.just(1, 2, 3)
                .doOnNext({ Observable.error(new IllegalStateException("I'm behaving badly")).subscribe() })
                .subscribe(Actions.empty(), { onErrorCalledFromSubscribe = true })
        then:
        thrown(OnErrorNotImplementedException)
        onErrorCalledFromSubscribe == true     //THIS ASSERTION FAILS
    }

    @FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorFailedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers")
    def "when a doOnNext calls a failing observable subscription with an onError handler, the outer onError handler is not called"() {
        given:
        boolean onErrorCalledFromSubscribe = false
        when:
        Observable.just(1, 2, 3).
                doOnNext({
                    Observable.
                            error(new IllegalStateException("I'm behaving badly")).
                            subscribe(
                                    Actions.empty(),
                                    { t -> throw new IllegalStateException("inside inner onError", t) }
                            )
                }).
                subscribe(
                        Actions.empty(),
                        { onErrorCalledFromSubscribe = true }
                )
        then:
        thrown(OnErrorFailedException)
        onErrorCalledFromSubscribe == true     //THIS ASSERTION FAILS
    }


Is there something I'm doing wrong here? I understand why the behaviour occurs, but I would expect to be able to do doOnNext calls that eventually subscribe to other Observables and when those fail - even with an explicit error handler - , I would expect those errors to be propagated to my outer Observable.

The only solution I would see is to catch all calls inside my doOnNext call and convert any caught OnErrorXXXException to something else. But that seems to contradict the rx guidelines. Or not.

Best regards
Jan


Op dinsdag 1 maart 2016 14:23:20 UTC+1 schreef Dávid Karnok:
The problem I see is you use the paremeterless subscribe() method which has that default behavior of throwing OnErrorNotImplementedException, which is considered a fatal exception that has nowhere to go. RxJava chose to throw it upwards which may end up on some thread's call chain but the original IllegalStateException is not considered fatal and can travel downstream - but that requires an onError handler. Therefore, if you throw an OnErrorNotImplementedException manually, RxJava will think its one of his own fatal errors and throws them upwards, not calling any downstream onError at all.

Dávid Karnok

unread,
Mar 1, 2016, 11:00:15 AM3/1/16
to Jan Bols, RxJava
Is there something I'm doing wrong here?

Throwing from an onError handler yields you a fatal error which won't propagate downwards so your doOnError is never called. Try not to throw from the onError handler and don't throw OnErrorNotImplementedException manually either, because it is considered fatal and the normal error paths are bypassed.

> do doOnNext calls that eventually subscribe to other Observables and when those fail

If you find yourself in this situation, that means you need flatMap() instead of manual subscribe().

Jan Bols

unread,
Mar 1, 2016, 3:32:12 PM3/1/16
to RxJava, guter...@gmail.com
Hi David, 
I hear you. Throwing exceptions from onError is only for fatal cases you don't want to recover from. 

However, I still get the feeling it should be fatal for the Observable-Observer process where the exception was thrown. Not to any other Observables further down the stack. It would be useful for the subscriber of the inner Observable to treat the error as fatal and rethrow it, but for the subscriber of the outer Observable to be able to handle the error.

observable
   .doOnNext( call someone somewhere else that does lots of things among others subscribing to an observable )
   .doOnError( let's do something about it )



Anyway, thanks for taking the time to explain it to me.
Jan

Op dinsdag 1 maart 2016 17:00:15 UTC+1 schreef Dávid Karnok:
Is there something I'm doing wrong here?

Throwing from an onError handler yields you a fatal error which won't propagate downwards so your doOnError is never called. Try not to throw from the onError handler and don't throw OnErrorNotImplementedException manually either, because it is considered fatal and the normal error paths are bypassed.

> do doOnNext calls that eventually subscribe to other Observables and when those fail

If you find yourself in this situation, that means you need flatMap() instead of manual subscribe().

Reply all
Reply to author
Forward
0 new messages