Schedulers for doOnSubscribe(), finallyDo(), etc.

2,841 views
Skip to first unread message

Daniel Lew

unread,
Nov 3, 2014, 9:53:35 AM11/3/14
to rxj...@googlegroups.com
Hello,
Currently it seems somewhat like undefined behavior on which thread methods like doOnSubscribe() and finallyDo() act. As far as I can tell, doOnSubscribe() uses the Scheduler defined in subscribeOn(), whereas finallyDo() uses observeOn(). But what if the behavior desired here is to have both act on a separate Scheduler?

Simple example of this causing issues for me: increment a value when an Observable is subscribed to, decrement afterwards. If value is ever > 0, then we show a progress bar. I need these to run on particular threads on Android or it'll blow up (manipulating UI from non-main thread).

I'm wondering if I'm just not using these methods properly - it does seem a bit like introducing side effects - but then, what is the point of these methods otherwise?

-Dan

Ben Christensen

unread,
Nov 3, 2014, 11:17:23 AM11/3/14
to Daniel Lew, rxj...@googlegroups.com
Those operators don’t use any schedulers. They behave just like `map`, they synchronously execute on whatever thread calls it. So `doOnSubscribe` is on the thread doing the subscribing and `doOnNext`/`doOnCompleted`/`doOnError`/`finallyDo` execute on whatever thread emits those signals. 

Yes, the point of those methods is side-effects (such as logging, debugging, or can be used as you are using it).



-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Daniel Lew

unread,
Nov 3, 2014, 11:24:17 AM11/3/14
to Ben Christensen, rxj...@googlegroups.com
I guess then that it just won't work in this case, then? It's fairly common pattern to want to subscribe on something like Schedulers.io() but execute on AndroidSchedulers.mainThread(), since you can't modify the UI from anything but the main thread. That works fine for finallyDo() (and the rest) but not for doOnSubscribe().

-Dan

Ben Christensen

unread,
Nov 3, 2014, 11:32:27 AM11/3/14
to Daniel Lew, rxj...@googlegroups.com
doOnSubscribe by definition is happening “onSubscribe” and thus on that thread. If it is already side-effecting (which it is) then you could emit to another Observable that uses observeOn to put itself on the UI thread. I don’t like Subjects at all, but this may be an example where it applies since you are purposefully doing a side-effect that is not composed inside the stream (you’re reaching outside anyways, so using a Subject wouldn’t be much different).

Or, you could use just the AndroidSchedulers.mainThread() directly and schedule work on to it all you’re doing is scheduling an update to the UI and not actually using the Observable behaviors.

Would be interesting to ask Erik Meijer’s opinion on this … I’ll see what his thoughts are.

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Dmitry Suzdalev

unread,
Nov 6, 2014, 6:42:48 PM11/6/14
to rxj...@googlegroups.com, danl...@gmail.com
Yep, I was going to write about the same post.
In Android it is quite useful to do usual chain-like building up of observable items stream and at some point do some side-effecting stuff like updating some UI title or label, which needs to happen on a main thread.

I also thought that current way of doing this might be by using and AndroidSchedulers.mainThread() directly. 
Your example with a subject is good too, thanks.

Ben Christensen

unread,
Nov 6, 2014, 7:11:50 PM11/6/14
to Dmitry Suzdalev, rxj...@googlegroups.com, danl...@gmail.com
I got confirmation from Erik Meijer that this is one of those times that a Subject is indeed a good option. 


moluae...@gmail.com

unread,
Jun 4, 2018, 2:23:48 PM6/4/18
to RxJava
I have found out that the position of doOnSubcribe in the chain matters. It hurts my mind to think of a reason. This doesn't work

matchRepository.
commenceMatch(new NewMatch(matchId)).
        doOnSubscribe(disposable -> homeView.showProgress(true, true)).
observeOn(SchedulerProvider.getInstance().ui()).

subscribeOn(SchedulerProvider.getInstance().computation()).

doFinally(() -> homeView.showProgress(false, false)).
subscribe(() -> {
homeView.showWaitingScene();
}, this::handleError);

I get a toast saying that you cannot manipulate views from another thread. However

matchRepository.
commenceMatch(new NewMatch(matchId)).
observeOn(SchedulerProvider.getInstance().ui()).
subscribeOn(SchedulerProvider.getInstance().computation()).
doOnSubscribe(disposable -> homeView.showProgress(true, true)).
doFinally(() -> homeView.showProgress(false, false)).
subscribe(() -> {
homeView.showWaitingScene();
}, this::handleError);

This works. It is as if the execution of the chain starts even before the subcribe is called. 
Reply all
Reply to author
Forward
0 new messages