How to handle shared refreshable / paginated observable properly with RxJava?

1,268 views
Skip to first unread message

Daniele Segato

unread,
May 5, 2016, 1:15:24 PM5/5/16
to RxJava
This question has been asked many times in different formats, here some examples:
there are more...
every question and response is different and none I really like so far...

I created my own implementation of this, I'd like to share it and see what you think and if there's a more reactive way of doing it / more efficient / clean or if I just did something really bad :)

In general you need this when you have a remote api of some kind used by many places of your code.

I would say you want this:
  1. an hot observable
  2. that when someone subscribe to return the last available data (or fetch one if it is the first subscriber)
  3. when no one is subscribed anymore (nice to have = after a timeout) it is disposed
  4. any "Observer" should be able to trigger a refresh
  5. the new refreshed event should reach all the observers
  6. if pagination is involved the Observer should be able to ask for next page to be fetched
  7. all observer should receive said next page
  8. listeners should receive meta information of some kind "I'm refreshing this"
My basic block is always a cold observable that emit 1 item (the response to the HTTP call in my case) or an error and then complete.

Looking at point 5 in my list: "the new refreshed event should reach all the observers"
In my opinion this means that an error or complete CANNOT and shouldn't be possible in the Observable. Otherwise you need some other observer notifying all that they should re-subscribe.

This to me means materialize() has to be used. I created my own Event class.

observable.materialize()
.scan(Event.create(EventStatus.LOADING, (T) null, null), (previousEvent, notification) -> {
switch (notification.getKind()) {
case OnNext:
return Event.create(EventStatus.LOADING, notification.getValue(), null);
case OnCompleted:
return Event.create(EventStatus.IDLE, previousEvent.data(), null);
case OnError:
return Event.create(EventStatus.ERROR, previousEvent.data(), notification.getThrowable());
default:
return previousEvent;
}
}).startWith(Event.create(EventStatus.LOADING, null, null));


So my cold observable is transformed into a stream of Event objects, the first one is always a LOADING, followed by a LOADING with value, then an IDLE with the value, or in case of error an ERROR with the previous value.
Event.makeEvent() return a Transformer with that function so that I can use it with compose()

 
Observable.just("Test")
    .compose(Event.makeEvent())
    .subscribe(loggerObserver);

prints:

onNext: Event{status=LOADING, data=null, error=null}
onNext: Event{status=LOADING, data=null, error=null}
onNext: Event{status=LOADING, data=Test, error=null}
onNext: Event{status=IDLE, data=Test, error=null}
onCompleted

the final onCompleted is still triggered by materialize()

There's a double call to loading without data but I can just distinctUntilChanged that.

my point 4: any observer should be able to trigger a refresh, I figured I needed some kind of method refresh() in an helper utility.

So I created an interface called ServiceCall:

Observable<Event<T>> getAsEvents();
void refresh();
boolean hasData();

To wrap the Observable in some way.
This things got tricky, specially for step 5 of my list above.

I wanted that Observable<Event<T> to never complete and I wanted to inject into it the new data when any holder of my ServiceCall requested it.

I ended up implementing my ServiceCall with a BehaviorRelay<Event<T>> from RxRelay.

But first let's see point 2 of my list:

@Override
public Observable<Event<T>> getAsEvents() {
    return relay.asObservable()
            .startWith(initialize())
            .distinctUntilChanged();
}

the initialize() method return an Observable.empty() if the relay already has data in it and initialize the http remote call observable if not

protected Observable<Event<T>> initialize() {
    if (isInitialized()) {
        // relay already contains data, no need to do anything
        return Observable.empty();
    }
    // initialize
    return fetchFromRemote() 
            // make this an event
            .compose(Event.makeEvent())
            // and put events in the relay
            .doOnNext(relay::call);
}

The first subscriber will trigger the first remote call, every other will just receive the last event from the relay.
If I used replay(1) or a BehaviorSubject here the onComplete event would still be triggered when my remote call completed. The Relay make sure there's no such onComplete and since I convert errors to events the Observable stay alive.

Now step 4 and 5, the refresh:

@Override
public void refresh() {
    fetchFromRemote()
            .compose(Event.makeEvent())
            .subscribe(relay::call);
}


I just subscribe directly to the fetch and pass it to the relay, any Observer automatically receive all the events. (Should auto-unsubscribe when done because it receive the onComplete)

For pagination I created a PaginatedServiceCall interface extending ServiceCall and adding two methods:

boolean hasNext();
void fetchNext();

It is required by this interface that the data type implement another interface to check if it has next data or not so the internal implementation of hasNext() is demanded to the data type, while the fetchNext is almost like the refresh method. Internally it fetch the remote service for the next page and "merge" it into the previous one, then send it to the relay. For the concrete implementation knowledge of the data type is needed (needed to know how to build the next request).

This way all the observers will receive a new object containing all the previous pages + the new one.

I didn't get into details of how the pagination is performed on purpose: in my case I had to repeat the same call adding a "nextKey" to the query parameter, can be a numbered page or whatever.


Problems with this implementation:
  • something in my code has to keep a reference to the ServiceCall to share it across Observers (I don't think there's any way around it)
  • point 3 of my list is hard to handle. Probably a Weak reference could suffice, when no subscription and no reference to the ServiceCall it will just be garbage collected (I whish I had a way to add some kind of timeout to it tough)
  • either I lose all the handy mapping, filtering etc or I keep both the ServiceCall reference (to trigger refresh) and the Observable on which I apply all my filtering. (I mitigated this with some more method in my ServiceCall interface to map to another ServiceCall in a similar fashion of the Observable)
  • other?
It worked for me until now, but I wish to know if you guys think I did some bad mistake or I could do something better or if RX provide another way of doing this in a more reactive fashion.
Hoping for feedback :)

Regards,
Daniele


Mohammed Rampurawala

unread,
Nov 24, 2019, 11:16:40 AM11/24/19
to RxJava
Do you have a reference to this code on Github or somewhere? Because I am also trying to implement some use case like this.
Reply all
Reply to author
Forward
0 new messages