Is there a cleaner way to adapt a standard observer/listener service using RxJava2?

35 views
Skip to first unread message

SomeGuy OnTheInternet

unread,
Apr 9, 2020, 11:15:47 AM4/9/20
to RxJava
I've been tinkering with wrapping an old style listener interface using RxJava. What I've come up with seems to work, but the usage of Observable.using feels a bit awkward.

The requirements are:
  1. Only one subscription per id to the underlying service.
  2. The latest value for a given id should be cached and served to new subscribers.
  3. We must unsubscribe from the underlying service if nothing is listening to an id.
The following is what I've got.  Is there a better way?

static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
   
PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
   
MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
   
final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();


   
public Observable<String> getObservable(String id) {
       
return Observable.using(() -> subscriptionMap.computeIfAbsent(
                id
,
                key
-> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(key))
                       
.doOnSubscribe(disposable -> mockService.subscribe(key))
                       
.doOnDispose(() -> {
                            mockService
.unsubscribe(key);
                            subscriptionMap
.remove(key);
                       
})
                       
.map(mockResponse -> mockResponse.value)
                       
.replay(1)
                       
.refCount()),
                observable
-> observable,
                observable
-> {}
       
);
   
}
}

Thanks,
Peter

George Campbell

unread,
Apr 10, 2020, 4:08:22 PM4/10/20
to RxJava
The second and third argument aren't adding any value. Using is more about wrapping the life cycle of resources that need to be cleaned up. For example a file handle that needs to be closed when the Observable is done. Change the code to the defer. Defer is for delaying the execution of code that creates an Observable until the time of subscription (aka lazy)

                  return Observable.defer(() -> subscriptionMap.computeIfAbsent(

                id
,
                key
-> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(key))
                       
.doOnSubscribe(disposable -> mockService.subscribe(key))
                       
.doOnDispose(() -> {
                            mockService
.unsubscribe(key);
                            subscriptionMap
.remove(key);
                       
})
                       
.map(mockResponse -> mockResponse.value)
                       
.replay(1)
                       
.refCount())

       
);
Reply all
Reply to author
Forward
0 new messages