public static <T extends PlayEvent> void timeoutAndDistinctViewingSession( Observable<T> source, Observer<T> sink ) { source.timeout(/*Timeout after delay after last event, and preventing memory leaks so that inactive streams are closed */ 3, TimeUnit.HOURS ) .distinctUntilChanged(/*So that consecutive identical play events are skipped*/ PlayEvent::getViewingSession ) .serialize() .subscribe(sink /*Pass event on to function/class responsible for deciding if this is a kick*/); }@Test public void testTimeoutWindow() throws Exception {
final TestScheduler testScheduler = Schedulers.test(); TestSubject<PlayEvent> testSubject = TestSubject.create(testScheduler);
PlayEventRxFunctions.timeoutAndDistinctViewingSession(testSubject, this);
final int userId = 1; testSubject.onNext(createPlayEvent(userId, UUID.randomUUID())); testScheduler.advanceTimeBy(4, TimeUnit.HOURS); testSubject.onNext(createPlayEvent(userId, UUID.randomUUID()));
Assertions.assertThat(getOnErrorEvents().size()).isEqualTo(1); Assertions.assertThat(getOnNextEvents().size()).isEqualTo(1); }