Operation OrderedMerge

566 views
Skip to first unread message

Todd Nine

unread,
Apr 17, 2014, 9:37:07 PM4/17/14
to rxj...@googlegroups.com
Hi guys,
  I have a need for merge that merges 2 or more observables into a single observable and emits the values in order (defined by a comparator) from each of the Observables.  This will behave exactly like the sort operation of a merge sort, so it relies on the assumption that the items emitted from the source observables are already sorted.  Is there already an existing operation that accomplishes this that I've missed in the documentation? Observable.merge does something similar, but it's in random order.

Thanks,
Todd

Ben Christensen

unread,
Apr 17, 2014, 11:10:25 PM4/17/14
to Todd Nine, rxj...@googlegroups.com
Since an Observable sequence could be infinite, there isn’t a mechanism for sorting on an Observable directly (since it would never end) but there is toSortedList(sortFunction) that allows you to capture the contents of an Observable into a List and sort it. Thus it would be Observable.merge(o1, o2).toSortedList(sortFunction).

If the sequences are infinite, then the only option is to use buffer and sort the lists emitted in each buffer.

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Jake Wharton

unread,
Apr 18, 2014, 12:55:49 AM4/18/14
to Ben Christensen, Todd Nine, rxj...@googlegroups.com
I think this can be done even if they're infinite:
  1. Buffer each observable until you have at least one item from each.
  2. Perform the merge operation using the supplied Comparator on the head of each buffer and emit the lowest value.
  3. Repeat #2 on each onNext of every contributing Observable.
  4. As onComplete's happen, remove that buffer from the list.
  5. On the final onComplete, emit your onComplete
What do you think?

I haven't been keeping up with the backpressure work, but you could apply some if any one buffer grows beyond a threshold to let the others catch up.

Todd Nine

unread,
Apr 18, 2014, 1:24:05 AM4/18/14
to Jake Wharton, Ben Christensen, rxj...@googlegroups.com
Hi Jake,
  What you are describing is similar to what I was envisioning.  Essentially keep an ordered set of each on next element (only the first from each would need to be buffered).  

After the initial buffer of all elements, I would think there would be a 1-to-1 correlation with emitting the min and a source's onNext invocation. 

 After onCompleted is invoked for every source observable, you would drain the set and invoke onCompleted to the next observer. 

The main issue I see is the possibility of 1 source Observable emitting too many items before other source observables emit their first item. I'm not sure how to handle this, other than set a max buffer size and throw an exception if the other Observables are too slow to produce an element. 

Thoughts?

Jake Wharton

unread,
Apr 18, 2014, 1:26:59 AM4/18/14
to Todd Nine, Ben Christensen, rxj...@googlegroups.com
Yep. That's called backpressure. There's an issue for a first-party API here: https://github.com/Netflix/RxJava/issues/1000

As far as I know, there's not much in place to handle this now. You would have to roll your own solution of communicating with the source observables and tell them to slow down.

Ben Christensen

unread,
Apr 18, 2014, 12:18:58 PM4/18/14
to Todd Nine, Jake Wharton, rxj...@googlegroups.com
If you’re looking to just combine the latest value from each observable and sort those and emit then that sounds like zip. It waits to receive a value from each observable and then lets you apply a function to combine them (sorted list in your case).



-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Todd Nine

unread,
Apr 18, 2014, 12:43:25 PM4/18/14
to Ben Christensen, Jake Wharton, rxj...@googlegroups.com
Hey Ben,
  It might not necessarily be the latest.  Here is a better example of what I'm looking to do.

Observable 1 => 10 9 8 3 2

Observable 2 => 6 5 4

Merged Observable => 10 9 8 6 5 4 3 2

As you can see, the first would emit 3 items, then observable 2 would emit it's items, then the last items of observable 1.


Ben Christensen

unread,
Apr 18, 2014, 1:53:11 PM4/18/14
to Todd Nine, rxj...@googlegroups.com, Jake Wharton
And you want the full output ordered, or based on time or size windows?

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Todd Nine

unread,
Apr 18, 2014, 1:57:18 PM4/18/14
to Ben Christensen, rxj...@googlegroups.com, Jake Wharton
Full output ordering.  

In our use case, our Observables are wrappers around iterators that emit items from Cassandra data.  If we had a way to enable the Observables to halt (they're on their own io() scheduler) then we could stop emitting after a single item (or buffer items) while we wait for the other Observables to catch up.  It seems like back pressure is the natural way to accomplish this.


I realize that this may slow down the Observers consuming the emitted items, but total ordering is more important in this case than high throughput.  Since each Observable pre-fetches it's next page, I only envision waiting for a few milliseconds at the most.


Thoughts?

Ben Christensen

unread,
Apr 18, 2014, 2:06:39 PM4/18/14
to Todd Nine, rxj...@googlegroups.com, Jake Wharton
Are you saying ordering as in “ordered by time” or “natural ordering”?

Here is the ordering done naturally and then in reverse numerical order:

    private static void numericalOrdering() {

        Observable<Integer> a = Observable.from(10, 9, 8, 3, 2);

        Observable<Integer> b = Observable.from(6, 5, 4);

        Observable.merge(a, b).toSortedList().subscribe(System.out::println);

        Observable.merge(a, b).toSortedList((x, y) -> y - x).subscribe(System.out::println);

    }


This outputs: 

[2, 3, 4, 5, 6, 8, 9, 10]

[10, 9, 8, 6, 5, 4, 3, 2]



If you want by time, here’s a way to do it:

    private static void timeOrdering() {

        Observable<Integer> a = Observable.from(10, 9, 8, 3, 2);

        Observable<Integer> b = Observable.from(6, 5, 4);

        // this emits List<Timestamp<Integer>>

        Observable.merge(a.timestamp(), b.timestamp()).toSortedList((t1, t2) -> (int) (t2.getTimestampMillis() - t1.getTimestampMillis())).subscribe(System.out::println);


        // this extracts it back out to just the list of values

        Observable.merge(a.timestamp(), b.timestamp())

                .toSortedList((t1, t2) -> (int) (t2.getTimestampMillis() - t1.getTimestampMillis()))

                .flatMap(list -> Observable.from(list).map(l -> l.getValue())).toList().subscribe(System.out::println);


    }



-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Dávid Karnok

unread,
Apr 18, 2014, 2:42:43 PM4/18/14
to rxj...@googlegroups.com
Hi. Here is a sketch about how I would implement such an operator:

- have a subscriber with a queue and a done flag, subscribed to each source,
- each onNext calls a common next method which is aware of all the queues and flags
- for each queue find the that queue whose peeked element is the smallest according to the custom comparator
- if any of the queue is empty but not done, do nothing, otherwise emit the found smallest value
- each onCompleted call should set its own done flag and call the common next.

Todd Nine

unread,
Apr 21, 2014, 12:21:01 PM4/21/14
to Dávid Karnok, rxj...@googlegroups.com
Hey Guys, thanks for your help.  I'll hopefully get to this over the next couple of days.

Ben, to answer your question, it's "natural ordering". Correct me if I'm wrong, but won't the toSortedList() buffer all elements in memory?  I'm looking at processing several million emitted values (graph edges), so I definitely don't want to hold them all in memory at the same time.


Ben Christensen

unread,
Apr 21, 2014, 12:54:36 PM4/21/14
to Todd Nine, Dávid Karnok, rxj...@googlegroups.com
Yup, toSortedList will hold them all in memory. 

If it is natural ordering and you want them all sorted but you can’t hold them all in memory, how do you intend to sort them as a stream? That’s what I still don’t understand in this conversation. 

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Ben Christensen

unread,
Apr 21, 2014, 12:57:01 PM4/21/14
to Dávid Karnok, rxj...@googlegroups.com
I don’t understand what that would achieve. If we need to sort, it needs the entirety of data doesn’t it?

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Jake Wharton

unread,
Apr 21, 2014, 1:09:04 PM4/21/14
to Ben Christensen, Dávid Karnok, rxj...@googlegroups.com
The individual data streams are already sorted. All that's needed is to merge multiple sorted streams into one. This only requires buffering to maintain at least one element from every stream.

Ben Christensen

unread,
Apr 21, 2014, 1:24:00 PM4/21/14
to Jake Wharton, Dávid Karnok, rxj...@googlegroups.com
Ahhh, that’s what I was missing. Nevermind my thoughts on this then :-)
 
-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Dávid Karnok

unread,
Apr 22, 2014, 4:25:34 AM4/22/14
to rxj...@googlegroups.com, Ben Christensen, Dávid Karnok

Todd Nine

unread,
Apr 23, 2014, 1:19:48 PM4/23/14
to rxj...@googlegroups.com, Ben Christensen, Dávid Karnok
Thank you everyone for your help, it's been invaluable.  I've merged David's impl ideas with the one I was already working on (I started with modeling the Merge operator).  More as an exercise to learn how to create my own Observables.  I have a question around where my implementation diverges from David's. 

Here, David uses a SerializedSubscriber to ensure onNext is only ever called once.


I have multiple io threads producing these values.  As values are emitted from the sources, I need to ensure onNext is called Serially.  Wrapping the subscription accomplishes this.

However, in mine, I've had to synchronize on the inner coordinator that selects the min values


Don't I need to make sure the selection itself is synchronized for a subscription, not just the production to ensure the same value isn't selected twice (then emitted serially) by 2 different Observable threads?

Thanks,
Todd

Todd Nine

unread,
Apr 23, 2014, 1:27:13 PM4/23/14
to rxj...@googlegroups.com, Ben Christensen, Dávid Karnok
I forgot one last question. When do you guys think backpressure will become part of the release?  For now, I know in my use case I'll only ever have the use case of multiple threads producing in my ordered merge.  To alleviate the back pressure, I've introduced a semaphore for each inner.  Not ideal, but will for for now.  Ideally I'd like to replace it with the backpressure API's

Thanks,
Todd

Ben Christensen

unread,
Apr 23, 2014, 1:35:35 PM4/23/14
to Todd Nine, rxj...@googlegroups.com, Dávid Karnok
I think it will be at least another month or two before we are comfortable with the design enough to release it.

Now that 0.18 is out I’ll turn my attention to having public pull requests showing progress (after some work on Hystrix).

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Dávid Karnok

unread,
Apr 23, 2014, 2:01:57 PM4/23/14
to rxj...@googlegroups.com, Ben Christensen, Dávid Karnok
In my code the wip counter makes sure only one thread enters the compare region. In addition, my source queues are synchronized so I don't need to hold a global lock for them like you did. Not sure why you added the started and drained flags. Have you tried my implementation in the tests?

Mark

unread,
Dec 20, 2014, 12:26:38 PM12/20/14
to rxj...@googlegroups.com
Did you ever implement this? We're looking at doing something similar...

Thanks
Mark.

Mark

unread,
Feb 7, 2015, 5:21:02 AM2/7/15
to rxj...@googlegroups.com
We have now implemented this is Scala ... is there any interest in having this contributed back to RxScala?
Reply all
Reply to author
Forward
0 new messages