Observable.merge with takeUntil emitting in the wrong order.

498 views
Skip to first unread message

Ulysses Popple

unread,
Feb 27, 2015, 4:43:11 PM2/27/15
to rxj...@googlegroups.com
Hey,

I'm trying to implement a merge with priority function that takes an Observable<Observable<T>> and returns an Observable<T>. The returned Observable<T> prioritizes emissions from the most recent  Observable<T> in the stream using takeUntil.

Some examples using mergewithPriority(Observable.just(A, B, C)):

A emits first: results = [A]
B emits first, then A: [B, A]
C emits first, then A: [C, A] (notice no B)

A emit and B emit at the same time: [A]

B emits B1, B2, then A emits A1: [B1, B2, A1]
B emits B1, then A emits A1, then B would emit B2: [B1, A1]

So on and so forth.

Here's the merge with priority code:

    public static <T> Observable<T> mergeWithPriority(Observable<Observable<T>> sources) {
        return Observable.merge(sources.map(Observable::share).scan((a, b) -> b.takeUntil(a)));
    }

Share is used because otherwise the first observable will be subscribed to multiple times by inner subscribers of takeUntil and merge. In our use case, the first observable is a service call, so this is no good.

The issue I have is as follows:

A is a hot observable. The function call is mergewithPriority(Observable.just(A, B, C)):

A , and B emit at the same time: [A, B]

The order is important, because if merge with priority is behaving correctly, there should be no way that B is emitted after A.

These are the tests I've been using to try and debug this:

    public void testHotPriorityObservable() {
        Observable<String> source2 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Source #2");
            }
        });
        Observable<String> source1 = Observable.just("Source #1");
        Observable<String> source3 = Observable.just("Source #3");
        Observable<String> mechanism = RxUtils.mergeWithPriority(Observable.just(
            source1.delay(10, TimeUnit.MILLISECONDS),
            source2,
            source3
        ));

        List<String> results = mechanism.take(2).toList().toBlocking().first();
        assertEquals(2, results.size());
        assertEquals("Source #2", results.get(0));
        assertEquals("Source #1", results.get(1));
    }

    public void testSubscriptionCount() {
        BehaviorSubject<Integer> subscriptionCount = BehaviorSubject.create(0);
        Observable<String> source1 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Source #1");
                subscriptionCount.onNext(subscriptionCount.getValue() + 1);
            }
        });
        Observable<String> source2 = Observable.just("Source #2");
        Observable<String> mechanism = RxUtils.mergeWithPriority(Observable.just(
            source1,
            source2
        ));

        mechanism.observeOn(Schedulers.immediate()).subscribeOn(Schedulers.immediate()).subscribe();

        Integer result = subscriptionCount.getValue();
        assertEquals(Integer.valueOf(1), result);
    }

Does anyone have an idea of what it going on? It seems as though something is going on internally that causes the strange emissions, but I'm at a loss as to what is actually happening.

Thank you very much.

Dávid Karnok

unread,
Feb 27, 2015, 7:51:00 PM2/27/15
to rxj...@googlegroups.com
Hi.

If I understand your example correctly, you need something like the amb() operator, but instead of dropping the losers, you order the final emission based on which source emitted a value and you run to completion with that, then the second source, etc. For example, given A, B, C sources, if A emits first, then C, then B, the output will be As, Cs and Bs. Is this right?

Ulysses Popple

unread,
Mar 2, 2015, 10:28:43 AM3/2/15
to rxj...@googlegroups.com
Given A, B, C sources, if A emits first then the output will be only As. Given A, B hot observables, if A never emits but B does, then the output is B but onComplete is never called.

At first we looked at the amb operator, but we want to restrict it based on the order of the sources not on first emission.

Ulysses 

Ulysses Popple

unread,
Mar 2, 2015, 1:02:02 PM3/2/15
to rxj...@googlegroups.com
After more investigation it looks like publish()/share() on the hot observable and takeUntil() are not working well together. My guess is it has to do with the unsafeSubscribe in takeUntil(), but it would be great to get confirmation of that.

Ulysses

Dávid Karnok

unread,
Mar 5, 2015, 3:03:23 PM3/5/15
to rxj...@googlegroups.com
Based on your pattern, I'd say you could use the switchIfEmpty() operator:

a.switchIfEmpty(b.switchIfEmpty(c))

If a just completes, it turns to b, if b just completes the it turns to c.

Some problems exists in takeUntil: it doesn't unsubscribe from its upstream if the other observable emits before the source completes. I'll post a fix for that and I'll see if your original example works or not.

Dávid Karnok

unread,
Mar 5, 2015, 4:17:44 PM3/5/15
to rxj...@googlegroups.com
After some analysis, I think the problem with your operator is that merge sequentially subscribes to each inner source, and because when source 2 emits, source3 is not yet subscribed to so its takeUntil won't see the value emission from source2. This order, however worked for me:

source1 = just("Source #1").delay(10, TimeUnit.MILLISECONDS).share();
source2 = create(s -> s.onNext("Source #2")).share();
source3 = just("Source #3").share();

result = merge(
    source3.takeUntil(source2.takeUntil(source1)),
    source2.takeUntil(source1),
    source1
);

Ulysses Popple

unread,
Mar 6, 2015, 12:05:44 PM3/6/15
to rxj...@googlegroups.com
Awesome! I thought it was something like that, but couldn't figure out a way to test it. The new way emits in the correct order with this function:

    public static <T> Observable<T> mergeWithPriority(Observable<Observable<T>> sources) {

       
return sources.map(Observable::share)
           
.scan((a, b) -> b.takeUntil(a))
           
.toList()
           
.map(observableList -> {
               
Collections.reverse(observableList);
               
return observableList;
           
})
           
.flatMap(Observable::merge);
   
}

However, the first observable is being subscribed to twice now with this test:

    public void testSubscriptionCount() {
       
BehaviorSubject<Integer> subscriptionCount = BehaviorSubject.create(0);
       
Observable<String> source1 = Observable.create(new Observable.OnSubscribe<String>() {
           
@Override
           
public void call(Subscriber<? super String> subscriber) {
                subscriber
.onNext("Source #1");
                subscriptionCount
.onNext(subscriptionCount.getValue() + 1);
           
}
       
});
       
Observable<String> source2 = Observable.just("Source #2");

       
Observable<String> mechanism = TRx.mergeWithPriority(Observable.just(

            source1
,
            source2
       
));


        mechanism
.observeOn(Schedulers.immediate()).subscribeOn(Schedulers.immediate()).subscribe();


       
Integer result = subscriptionCount.getValue();
        assertEquals
(Integer.valueOf(1), result);
   
}


Could it be because of an ordering problem with share()? It seems like share() could get disconnected and reconnected between subscriptions if `takeUntil` happens before the merge.

Ulysses

Dávid Karnok

unread,
Mar 6, 2015, 1:14:41 PM3/6/15
to rxj...@googlegroups.com
Yes, share gets reconnected because the takeUntils are evaluated one by one, opening and closing each other. I think your scenario requires a custom operator.

Here is a sketch how it could work (without backpressure for now)

1) start from OnSubscribe, have the list of Observables as its parameter.
2) create an inner class Merger with a method, having a int index field with MAX_VALUE, boolean emitting, Subscriber child and an ArrayList<Object>, ArrayList<SourceSubscriber> sources. Create a method named merge(int index, T value) and leave it empty for now
3) create a class SourceSubscriber extends Subscriber, with final fields index and Merger merger
4) implement its onNext, onError and onCompleted methods of SourceSubscriber by calling merger.merge(index, NotificationLite.instance().next(value)); or error(t) etc.
5) have a look at the SerializedSubscriber.onNext method in RxJava sources, you'll need a similar construct in Merger.merge, with the difference that your queue will hold the index-value pairs at subsequent indexes.
6) within the emitter loop compare the index against the queued/parameter index, and the incoming value is equal or less, use NotificationLite.apply to emit it, otherwise ignore it and call unsubscribe on sources.get(index)
7) implement the OnSubscribe.call by first newing merger, newing up as many SourceSubscribers as sources with incrementing indexes, add them to Merge.sources, add each of them to the incoming subscriber. Then loop over the Observable list and unsafeSubscribe each SourceSubscriber.

Ulysses Popple

unread,
Mar 6, 2015, 5:24:34 PM3/6/15
to rxj...@googlegroups.com
Cool, I was hoping it wouldn't come to writing a new operator, but it will be a fun exercise. 

Thank you for your help in working through this and for your suggestions regarding a custom operator,
Ulysses
Reply all
Reply to author
Forward
0 new messages