Observable with multiple observers

4,564 views
Skip to first unread message

Robby Pond

unread,
Nov 23, 2013, 2:34:12 PM11/23/13
to rxj...@googlegroups.com
Hello.

I'm really new to Rx but excited about using it to simplify some of my Android async code. In one of my apps I have to refresh the oauth token before making certain calls. I want to be able to refresh the token, save it/update UI, and then continue with the original request.
Currently what I have does everything except update the UI (since the subscriber is running on a background thread).

private final Observable<RefreshResponse> loginObservable = Observable.create(new Observable.OnSubscribeFunc<RefreshResponse >() {
        @Override
        public Subscription onSubscribe(Observer<? super RefreshResponse > observer) {
            try {
                    RefreshResponse refreshResponse = mAuthService.getAccessToken(refreshToken, "refresh_token");
                    processRefreshResponse(refreshResponse);
                    observer.onNext(refreshResponse);
                    observer.onCompleted();
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
            return Subscriptions.empty();
        }
    });

loginObservable.subscribeOn(Schedulers.newThread()).flatMap(new Func1<RefreshResponse, Observable<GalleryResponse>>() {
            @Override
            public Observable<GalleryResponse> call(final RefreshResponse refreshResponse) {
                return Observable.create(new Observable.OnSubscribeFunc<GalleryResponse>() {
                    @Override
                    public Subscription onSubscribe(Observer<? super GalleryResponse> observer) {
                        try {
                            GalleryResponse response = mGalleryService.getGallery(refreshResponse.refresh_token, section, sort, currentPage);
                            observer.onNext(response);
                            observer.onCompleted();
                        } catch (Exception e) {
                            observer.onError(e);
                        }
                        return Subscriptions.empty();
                    }
                });
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<GalleryResponse>() {
                               @Override
                               public void call(GalleryResponse galleryResponse) {
                                   mBus.post(new DefaultGalleryEvent(galleryResponse));
                               }
                           }, new Action1<Throwable>() {
                               @Override
                               public void call(Throwable error) {
                                   processError(error);
                               }
                           }
         );

The code looks really complicated for what I am trying to do so I'm sure that there is a better/simpler way.

Thanks in advance.

Ben Christensen

unread,
Nov 27, 2013, 4:58:51 PM11/27/13
to Robby Pond, rxj...@googlegroups.com
I would put utility methods somewhere for converting from/to Observables for those things you want to have async so you don’t have so much Observable.create boilerplate (painful in Java 7 and earlier) in your code :-)

Am I correct that you’re wrapping around blocking network calls and needing to run them on separate threads? If so then subscribeOn() is a good way to run it asynchronously on a separate thread.

Also, you mention multiple observers in the subject but I don’t see that in the example. The way to do it though is using operators like replay(), publish() or cache().

Most likely if you’re using replay() or publish() you’ll want to use refCount() with it. This way it multicasts to as many subscribers as you want but then cleans up when there are no subscribers and restarts when new subscribers subscribe. 

Thus you’d have something like this:

Observable myObservable = myObservable.publish().refCount();

Subscription a = myObservable.subscribe(observer)
Subscription b = myObservable.subscribe(observer)

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Adam Greenberg

unread,
Nov 29, 2014, 8:23:30 PM11/29/14
to rxj...@googlegroups.com, nova...@gmail.com
I know this thread is old, so you might have found the answer elsewhere, but I wanted to point out that what you are looking for can be found here: https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators

TLDR

You need to use a ConnectableObservable

ConnectableObservable co = Observable.publish()
co.subscribe();
co.subscribe();
co.connect();

Christopher Piggott

unread,
May 7, 2016, 9:28:14 AM5/7/16
to RxJava, nova...@gmail.com, agree...@mkone.co
Thanks!

The problem i was having was that I was trying to mix too many concepts.  I was trying to use publish() and groupBy() at the same time and it just didn't work.  I revised it to be:

ConnectableObservable<Object> o = Observable.create( observer ->  /* generate events */ )
    .publish;

obs.filter(  item -> item instanceof Type1)
    .cast(Type1.class)
    .buffer(BATCH_SIZE)
    .subscribe(item -> doSomethingWith1(item) );

obs.filter(  item -> item instanceof Type2)
    .cast(Type2.class)
    .buffer(BATCH_SIZE)
    .subscribe(item -> doSomethingWith2(item) );


obs.connect();


That works. I wish I could do the instanceof and the type at the same time to clean it up a little.  I was worried at first about the performance of this. That's why I was hoping .groupBy() could just split the input stream into a separate set of streams, each with one subscription on them. I realized, though, that if the .filter() is doing its job correctly then passing every object to every filter isn't that expensive (at least not compared to each doSomethingWith() )

Ben Sandee

unread,
May 7, 2016, 11:18:34 AM5/7/16
to RxJava

Christopher,

A few follow-up observations -- check out the ofType() operator, which combines your instanceof and cast calls into one.

Also, the general consensus suggests avoiding Observable.create() unless you really know what you're doing. There are a few threads on that if you search for it. Prefer Observable.fromCallable(Callable) or Observable.from(Future), etc. Of course, I don't know how complicated your observable source is so that advice may or may not be appropriate for you.

Ben

Christopher Piggott

unread,
May 7, 2016, 10:24:43 PM5/7/16
to RxJava
Hi,

Thanks, I didn't notice ofType() but that's perfect.  Now all my subscriptions are just:

obs.ofType(SomeItem.class)
        .buffer(BATCH_SIZE)
        .subscribe(item -> storeSomeItems(item));


I'm using Observable.create() for something that's a lot like an iterator but it's nested and a touch more complicated.  It iterates through a file.  A file is divided up into sections.  Each section has 12 or so lists of objects of different types.

I guess I could avoid using create() by using Observable.from(Iterable<T>) and make my own custom iterator that knows my specific semantics.  That would take some work for me but based on what you're saying maybe it's worth it because it's safer.  Do you think so?

Ben Sandee

unread,
May 7, 2016, 10:42:30 PM5/7/16
to RxJava
I'm no expert on that but it sounds like you'd have to jump through a few hoops to adapt to the simpler factories. Carry on with Observable.create() with care, as always... :)
Reply all
Reply to author
Forward
0 new messages