Is window operator right for this?

51 views
Skip to first unread message

John Sanda

unread,
Aug 14, 2015, 1:34:04 PM8/14/15
to RxJava
I have a query that returns an Observable<DataPoint> where DataPoint has a timestamp property stored as a long. The query returns the data points in sorted order by timestamp. Rather than emitting a single stream of data points, I want to emit a separate stream for each minute. I know groupBy will work for this, but I am reluctant to use it since it caches items in memory. I was thinking the window operator might be what I want. In particular, I was looking at,

    <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)


but I am confused by closingSelector. When I see that a data point is emitted and its timestamp falls into the next minute, I want to create a new window. Is the window operator right for this? If so can someone show me an example? Or should I be using a different operator?


Ben Christensen

unread,
Aug 14, 2015, 2:08:45 PM8/14/15
to John Sanda, RxJava
Window sounds right for your use case. If it is just time you need, use the overload that does the time for you: http://reactivex.io/RxJava/javadoc/rx/Observable.html#window(long,%20java.util.concurrent.TimeUnit)

The overload with the closing selector allows you to pass in another Observable that marks each window. For example, you could achieve the time thing in either of these ways:

stream.window(1, TimeUnit.MINUTES)
... or ...
stream.window(Observable.interval(1, TimeUnit.MINUTES))


Here is code:

package rx;


import java.util.concurrent.TimeUnit;


public class WindowExample {


public static void main(String... args) {

Observable<Long> stream = Observable.interval(100, TimeUnit.MILLISECONDS);


System.out.println("--- window with time");

stream.take(100).window(5, TimeUnit.SECONDS)

.flatMap(o -> {

System.out.println("New Window");

return o;

}).toBlocking().forEach(System.out::println);


System.out.println("--- window with selector using interval");

stream.take(100).window(Observable.interval(5, TimeUnit.SECONDS))

.flatMap(o -> {

System.out.println("New Window");

return o;

}).toBlocking().forEach(System.out::println);

}

}


John Sanda

unread,
Aug 14, 2015, 2:28:38 PM8/14/15
to RxJava, john....@gmail.com
Thanks for the example. I don't want time. I want the windows to be based on the timestamp of the data points. Let's say I have the following data points,

Data Point  | Timestamp
---------------------------------
P1              | 13:01:10
---------------------------------
P2              | 13:01:30
---------------------------------
P3              | 13:02:20
---------------------------------
P4              | 13:02:40
---------------------------------
P5              | 13:03:20
---------------------------------
P6              | 13:03:40

Then I want the first window to contain P1 and P2, the second window  to contain P3 and P4, and the third window to contain P5 and P6.

Hélder Sousa

unread,
Oct 25, 2018, 5:21:17 AM10/25/18
to RxJava
Hi John,

Did you find a solution for your use case? I have a similar one and I can't find a proper solution/example.
Best regards,

Helder Sousa
Reply all
Reply to author
Forward
0 new messages