BehaviorSubject Observer removed without calling doOnUnsubscribe

314 views
Skip to first unread message

Austyn Mahoney

unread,
Jan 8, 2015, 3:06:26 PM1/8/15
to rxj...@googlegroups.com
I am using a BehaviorSubject for an event bus type system. I have a progress indicator that turns on and off based on how many requests are running (greater than 0 it is shown, hidden otherwise). Each time a request has started or finished, the subject has an event posted to onNext() that includes a counter of how many requests are running. The BehaviorSubject is necessary because when a new Activity subscribes, it needs to know the current request counter.

For a reason unknown to me, my last onNext() event is not delivered to the observer in my Activity.


        private Subject<ProgressBarEvent, ProgressBarEvent> mProgressSubject = BehaviorSubject.create();

        // ProgressIndicator#getObservable(), 
        public Observable<ProgressBarEvent> getObservable() {
            if (mProgressObservable == null) {
                mProgressObservable = mProgressSubject.serialize()
                        .distinctUntilChanged()
                        .subscribeOn(Schedulers.computation())
                        .doOnUnsubscribe(new Action0() {
                            @Override
                            public void call() {
                                logger.d("mProgressObservable UNSUBSCRIBE");
                            }
                        });
            }
            return mProgressObservable;
        }



        // Called in MyActivity#onStart() 
        mProgressSubscription = AppObservable.bindActivity(this, mProgressIndicator.getObservable())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<ProgressBarEvent>() {
                    @Override
                    public void call(ProgressBarEvent progressBarEvent) {
                        if (progressBarEvent.shouldShow()) {
                            mProgressBarHelper.show();
                        } else {
                            mProgressBarHelper.hide();
                        }
                    }
                });


I unsubscribe from the subscription in my Activity's onDestroy.


I added a logging breakpoint to BehaviorSubject:165 to see if there was even an observer attached when the last event is sent. The observer count goes from 1 to 0 without doOnUnsubscribe ever being invoked on my observable.

Debugger log from a test run:

// In ChildActivity
Request counter: 1
SubjectObservers: 1
SubjectObservers: 1

Request counter: 2
SubjectObservers: 1
SubjectObservers: 1

// ChildActivity is removed, so observer count goes to 0, doOnUnsubscribe is called here.
Request counter: 3
SubjectObservers: 0

Request counter: 2
SubjectObservers: 0

// MainActivity is showing again, with the observer re-subscribed to the observable, observer count back up to 1
Request counter: 1
SubjectObservers: 1
SubjectObservers: 1

// Observer count is somehow 0, doOnUnsubscribe is NOT called
Request counter: 0
SubjectObservers: 0

Stepping through the code, the last event with a counter of 0 is sent to mProgressSubject.onNext(), but the list of observers is empty in BehaviorSubject#onNext.




Can anyone point me in the right direction here? Is there a reason why the BehaviorSubject's observer count would be 0 without a call to doOnUnsubscribe occurring? Is this a bug, or do I have something wrong? If this is a better question for StackOverflow, let me know.

Dávid Karnok

unread,
Jan 8, 2015, 4:22:22 PM1/8/15
to rxj...@googlegroups.com
Do you send onCompleted to the BehaviorSubject somewhere? If not, the only reason they let their observers go if those observers call unsubscribe, but unsubscribe travels upstream so doOnSubscribe might not see it if it happens in an earlier operator.

Austyn Mahoney

unread,
Jan 8, 2015, 4:26:16 PM1/8/15
to rxj...@googlegroups.com
I have posted this to StackOverflow if you'd like to see the code with syntax highlighting.

Austyn Mahoney

unread,
Jan 8, 2015, 4:35:55 PM1/8/15
to rxj...@googlegroups.com
I only have two calls on the BehaviorSubject and they are both to onNext().

I moved the doOnUnsubscribe() operator above distinctUntilChanged() and there was no change. I still don't get a call to it. I don't see anywhere where it would unsubscribe either.

Austyn Mahoney

unread,
Jan 13, 2015, 1:40:19 PM1/13/15
to rxj...@googlegroups.com
I just found out something which may point to the underlying problem. If my second Activity unsubscribes from it's Observable observing the BehaviorSubject (which is created by a separate call to `ProgressIndicator#getObservable()`) before the last event is sent, the event is not observed by the first Activity. If it unsubscribes after the event is sent (the requests comes back quicker in some cases), then the event is observed properly.

I have changed the code to return a new Observable from ProgressIndicator#getObservable() on each call. It seems that unsubscribing from one of them is causing issues with the other Observable that is observing my BehaviorSubject. This may be why I see the observer count go from 1 to 0 without invoking doOnUnsubscribe().

I am going to try and distill this into a runnable app, any advice on getting to that would be appreciated.


ProgressIndicator#getObservable()

    public Observable<ProgressBarEvent> getObservable() {
        return mProgressSubject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                logger.d("mProgressObservable UNSUBSCRIBE");
            }
        }).doOnError(new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.d("mProgressObservable ERROR");
            }
        }).distinctUntilChanged().subscribeOn(Schedulers.computation());
    }
Reply all
Reply to author
Forward
0 new messages