Hey,
I'm trying to implement a merge with priority function that takes an Observable<Observable<T>> and returns an Observable<T>. The returned Observable<T> prioritizes emissions from the most recent Observable<T> in the stream using takeUntil.
Some examples using mergewithPriority(Observable.just(A, B, C)):
A emits first: results = [A]
B emits first, then A: [B, A]
C emits first, then A: [C, A] (notice no B)
A emit and B emit at the same time: [A]
B emits B1, B2, then A emits A1: [B1, B2, A1]
B emits B1, then A emits A1, then B would emit B2: [B1, A1]
So on and so forth.
Here's the merge with priority code:
public static <T> Observable<T> mergeWithPriority(Observable<Observable<T>> sources) {
return Observable.merge(sources.map(Observable::share).scan((a, b) -> b.takeUntil(a)));
}
Share is used because otherwise the first observable will be subscribed to multiple times by inner subscribers of takeUntil and merge. In our use case, the first observable is a service call, so this is no good.
The issue I have is as follows:
A is a hot observable. The function call is mergewithPriority(Observable.just(A, B, C)):
A , and B emit at the same time: [A, B]
The order is important, because if merge with priority is behaving correctly, there should be no way that B is emitted after A.
These are the tests I've been using to try and debug this:
public void testHotPriorityObservable() {
Observable<String> source2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Source #2");
}
});
Observable<String> source1 = Observable.just("Source #1");
Observable<String> source3 = Observable.just("Source #3");
Observable<String> mechanism = RxUtils.mergeWithPriority(Observable.just(
source1.delay(10, TimeUnit.MILLISECONDS),
source2,
source3
));
List<String> results = mechanism.take(2).toList().toBlocking().first();
assertEquals(2, results.size());
assertEquals("Source #2", results.get(0));
assertEquals("Source #1", results.get(1));
}
public void testSubscriptionCount() {
BehaviorSubject<Integer> subscriptionCount = BehaviorSubject.create(0);
Observable<String> source1 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Source #1");
subscriptionCount.onNext(subscriptionCount.getValue() + 1);
}
});
Observable<String> source2 = Observable.just("Source #2");
Observable<String> mechanism = RxUtils.mergeWithPriority(Observable.just(
source1,
source2
));
mechanism.observeOn(Schedulers.immediate()).subscribeOn(Schedulers.immediate()).subscribe();
Integer result = subscriptionCount.getValue();
assertEquals(Integer.valueOf(1), result);
}
Does anyone have an idea of what it going on? It seems as though something is going on internally that causes the strange emissions, but I'm at a loss as to what is actually happening.
Thank you very much.