import rx.Observableimport rx.exceptions.OnErrorNotImplementedExceptionimport rx.functions.Actionsimport spock.lang.FailsWithimport spock.lang.Specification
class RxTest extends Specification {
def "when a doOnNext throws an IllegalStateException, it's propagated to the executing thread as an OnErrorNotImplementedException when not specifying an onError"() { given: boolean onErrorCalledAsSideEffect = false when: Observable.just(1, 2, 3) .doOnNext({ throw new IllegalStateException("I'm behaving badly") }) .doOnError({ onErrorCalledAsSideEffect = true }) .subscribe() then: thrown(OnErrorNotImplementedException) onErrorCalledAsSideEffect == true }
def "when a doOnNext throws an IllegalStateException, it's handled by the onError handler"() { given: boolean onErrorCalledFromSubscribe = false when: Observable.just(1, 2, 3) .doOnNext({ throw new IllegalStateException("I'm behaving badly") }) .subscribe(Actions.empty(), { onErrorCalledFromSubscribe = true }) then: noExceptionThrown() onErrorCalledFromSubscribe == true }
@FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorNotImplementedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers") def "when a doOnNext throws an OnErrorNotImplementedException, it's NOT propagated to the executing thread when not specifying an onError"() { given: boolean onErrorCalledAsSideEffect = false when: Observable.just(1, 2, 3) .doOnNext({ throw new OnErrorNotImplementedException(new IllegalStateException("I'm behaving badly")) }) .doOnError({ onErrorCalledAsSideEffect = true }) .subscribe() then: thrown(OnErrorNotImplementedException) onErrorCalledAsSideEffect == true //THIS ASSERTION FAILS }
@FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorNotImplementedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers") def "when a doOnNext throws an OnErrorNotImplementedException, the onError handler is not called"() { given: boolean onErrorCalledFromSubscribe = false when: Observable.just(1, 2, 3) .doOnNext({ throw new OnErrorNotImplementedException(new IllegalStateException("I'm behaving badly")) }) .subscribe(Actions.empty(), { onErrorCalledFromSubscribe = true }) then: thrown(OnErrorNotImplementedException) onErrorCalledFromSubscribe == true //THIS ASSERTION FAILS }}
It is supposed to call onError: https://github.com/ReactiveX/RxJava/blob/v1.1.1/src/main/java/rx/internal/operators/OperatorDoOnEach.java#L82
@FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorNotImplementedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers") def "when a doOnNext calls a failing observable subscription, the onError handler is not called"() { given: boolean onErrorCalledFromSubscribe = false when: Observable.just(1, 2, 3) .doOnNext({ Observable.error(new IllegalStateException("I'm behaving badly")).subscribe() }) .subscribe(Actions.empty(), { onErrorCalledFromSubscribe = true }) then: thrown(OnErrorNotImplementedException) onErrorCalledFromSubscribe == true //THIS ASSERTION FAILS }
@FailsWith(value = AssertionError, reason = "the doOnNext throws an OnErrorFailedException, but it's interpreted as if the onError is already called resulting in swallowing the onError handlers") def "when a doOnNext calls a failing observable subscription with an onError handler, the outer onError handler is not called"() { given: boolean onErrorCalledFromSubscribe = false when: Observable.just(1, 2, 3). doOnNext({ Observable. error(new IllegalStateException("I'm behaving badly")). subscribe( Actions.empty(), { t -> throw new IllegalStateException("inside inner onError", t) } ) }). subscribe( Actions.empty(), { onErrorCalledFromSubscribe = true } ) then: thrown(OnErrorFailedException) onErrorCalledFromSubscribe == true //THIS ASSERTION FAILS }
The problem I see is you use the paremeterless subscribe() method which has that default behavior of throwing OnErrorNotImplementedException, which is considered a fatal exception that has nowhere to go. RxJava chose to throw it upwards which may end up on some thread's call chain but the original IllegalStateException is not considered fatal and can travel downstream - but that requires an onError handler. Therefore, if you throw an OnErrorNotImplementedException manually, RxJava will think its one of his own fatal errors and throws them upwards, not calling any downstream onError at all.
> Is there something I'm doing wrong here?Throwing from an onError handler yields you a fatal error which won't propagate downwards so your doOnError is never called. Try not to throw from the onError handler and don't throw OnErrorNotImplementedException manually either, because it is considered fatal and the normal error paths are bypassed.
> do doOnNext calls that eventually subscribe to other Observables and when those failIf you find yourself in this situation, that means you need flatMap() instead of manual subscribe().