private static void numericalOrdering() {
Observable<Integer> a = Observable.from(10, 9, 8, 3, 2);
Observable<Integer> b = Observable.from(6, 5, 4);
Observable.merge(a, b).toSortedList().subscribe(System.out::println);
Observable.merge(a, b).toSortedList((x, y) -> y - x).subscribe(System.out::println);
}
[2, 3, 4, 5, 6, 8, 9, 10]
[10, 9, 8, 6, 5, 4, 3, 2]
private static void timeOrdering() {
Observable<Integer> a = Observable.from(10, 9, 8, 3, 2);
Observable<Integer> b = Observable.from(6, 5, 4);
// this emits List<Timestamp<Integer>>
Observable.merge(a.timestamp(), b.timestamp()).toSortedList((t1, t2) -> (int) (t2.getTimestampMillis() - t1.getTimestampMillis())).subscribe(System.out::println);
// this extracts it back out to just the list of values
Observable.merge(a.timestamp(), b.timestamp())
.toSortedList((t1, t2) -> (int) (t2.getTimestampMillis() - t1.getTimestampMillis()))
.flatMap(list -> Observable.from(list).map(l -> l.getValue())).toList().subscribe(System.out::println);
}
- have a subscriber with a queue and a done flag, subscribed to each source,
- each onNext calls a common next method which is aware of all the queues and flags
- for each queue find the that queue whose peeked element is the smallest according to the custom comparator
- if any of the queue is empty but not done, do nothing, otherwise emit the found smallest value
- each onCompleted call should set its own done flag and call the common next.