<TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector)
but I am confused by closingSelector. When I see that a data point is emitted and its timestamp falls into the next minute, I want to create a new window. Is the window operator right for this? If so can someone show me an example? Or should I be using a different operator?
package rx;
import java.util.concurrent.TimeUnit;
public class WindowExample {
public static void main(String... args) {
Observable<Long> stream = Observable.interval(100, TimeUnit.MILLISECONDS);
System.out.println("--- window with time");
stream.take(100).window(5, TimeUnit.SECONDS)
.flatMap(o -> {
System.out.println("New Window");
return o;
}).toBlocking().forEach(System.out::println);
System.out.println("--- window with selector using interval");
stream.take(100).window(Observable.interval(5, TimeUnit.SECONDS))
.flatMap(o -> {
System.out.println("New Window");
return o;
}).toBlocking().forEach(System.out::println);
}
}