rxjava - How to handle zip exceptions without terminating the whole process

4,479 views
Skip to first unread message

Mysurf Mail

unread,
Nov 10, 2014, 7:12:57 AM11/10/14
to rxj...@googlegroups.com
This is similar to my previous question - but with a zip. It outputs different resonse and acts differenlty.

I have created two observables..
One of them throws an exception.

obs1 = Observable.from(new Integer[]{1, 2, 3, 4, 5, 6});

obs2 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override public void call(Subscriber<? super Integer> subscriber) {
        boolean b = getObj().equals(""); // this throws an exception
        System.out.println("1");
    }
});

I invoke them using

Observable.zip(obs2, obs1, new Func2<Integer, Integer, Object>() {
            @Override
            public ArrayList<Integer> call(Integer integer1, Integer integer2) {
                ArrayList<Integer> integers = new ArrayList<Integer>();
                integers.add(integer1);
                integers.add(integer2);
                return integers;
            }
        }).subscribe(new Observer<Object>() {
            @Override public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override public void onNext(Object o) {
                System.out.println("onNext - " + o);
            }
        });

Now, I dont want my process to halt completely when an exception occurs -
I want to handle it and I want obs1 to continue its work.

I have tried to write it using onErrorResumeNext(), onExceptionResumeNext(), doOnError() but nothing helped - obs1 did not run.

How can I handle the exception without stopping the other observable from being processed?

Samuel Tardieu

unread,
Nov 10, 2014, 7:25:52 AM11/10/14
to Mysurf Mail, rxj...@googlegroups.com

You need to use onErrorResumeNext() or similar methods on obs2, not on the result of zipping obs1 and obs2 because it is then too late. Use something like Observable.zip(obs1, obs2.onErrorResumeNext(…), …).

Mysurf Mail

unread,
Nov 11, 2014, 3:04:49 AM11/11/14
to rxj...@googlegroups.com, stamm...@gmail.com
did you mean something like :

Observable.zip(obs2.onErrorReturn(new Func1<Throwable, Integer>() {
            @Override public Integer call(Throwable throwable) {
                return 0;
            }
        }), obs1, new Func2<Integer, Integer, Object>() {
            @Override
            public ArrayList<Integer> call(Integer integer1, Integer integer2) {
                ArrayList<Integer> integers = new ArrayList<Integer>();
                integers.add(integer1);
                integers.add(integer2);
                return integers;
            }
        }).subscribe(new Observer<Object>() {
            @Override public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override public void onError(Throwable throwable) {
                System.out.println("onError");
            }

            @Override public void onNext(Object o) {
                System.out.println("onNext - " + o);
            }
        });

because unfortunately it doesn't work. obs1 still doesn't run. 

Samuel Tardieu

unread,
Nov 11, 2014, 7:19:49 AM11/11/14
to Mysurf Mail, rxj...@googlegroups.com
Here, you use onErrorReturn which only adds one more item to obs2 after an error, so it will zip one more couple only after the error.

What values do you want obs2 to produce after an error? If you want all zeroes for example, you need to use onErrorResumeNext(Observable.just(0).repeat()).

Keep in mind that zip() will always take one value from each observable, zip them, then the next one from each observable, zip them, and so on. You cannot have an interrupted stream of values if you expect it to continue.
Reply all
Reply to author
Forward
0 new messages