static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();
public Observable<String> getObservable(String id) {
return Observable.using(() -> subscriptionMap.computeIfAbsent(
id,
key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(key))
.doOnSubscribe(disposable -> mockService.subscribe(key))
.doOnDispose(() -> {
mockService.unsubscribe(key);
subscriptionMap.remove(key);
})
.map(mockResponse -> mockResponse.value)
.replay(1)
.refCount()),
observable -> observable,
observable -> {}
);
}
}
return Observable.defer(() -> subscriptionMap.computeIfAbsent(
id,
key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(key))
.doOnSubscribe(disposable -> mockService.subscribe(key))
.doOnDispose(() -> {
mockService.unsubscribe(key);
subscriptionMap.remove(key);
})
.map(mockResponse -> mockResponse.value)
.replay(1)
.refCount())
);