BlockingObservable.forEach does not return all return all Observable

268 views
Skip to first unread message

Yan Liu

unread,
Jan 14, 2015, 2:14:41 AM1/14/15
to rxj...@googlegroups.com
Below is my sample of inparallel call of Observables. I use BlockingObservable to block until all works complete. The issue here is that most of time I only get 3 numbers in StringBuffer. Looks not all Observalbes from flatMap go into forEach. Is it a known issue of BlockingObservable?

 
package myrx.myrxdemo;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;


public class App 
{
    public static void main( String[] args )
    {
    hello();
    }
    
    public static void hello() {
    final StringBuffer sb = new StringBuffer();
    Observable.from(new Integer[]{1,2,3,4}).flatMap(new Func1<Integer, Observable<String>>() {
        public Observable<String> call(final Integer i) {
        return Observable.create(new OnSubscribe<String>(){
public void call(Subscriber<? super String> subscriber) {
System.out.println(i);
subscriber.onCompleted();
subscriber.onNext(i.toString());
}
         
        }).subscribeOn(Schedulers.io());
        }}).toBlocking().forEach(new Action1<String>(){
public void call(String s) {
sb.append(s);
}
         
        });

   
    System.out.println(sb.toString());
    }
}

Dávid Karnok

unread,
Jan 14, 2015, 2:20:13 AM1/14/15
to rxj...@googlegroups.com
Hi, you OnSubscribe implementation is incorrect. You need to put subscriber.onCompleted() after onNext.

Yan Liu

unread,
Jan 14, 2015, 1:29:01 PM1/14/15
to Dávid Karnok, rxj...@googlegroups.com
Thanks Dávid for looking at my issue. It looks solve the problem. My following question would be: is this the right way to way for the result back? According to the doc, BlockingObservable is for testing purpose only. While in my use case, 
I do need to wait for all data back. Otherwise, the main thread just keeps running and returns empty response.
--
Liu Yan

Yan Liu

unread,
Jan 14, 2015, 1:31:45 PM1/14/15
to Dávid Karnok, rxj...@googlegroups.com
Also any idea of the performance of BlockingObserable?
--
Liu Yan

Pascal Warnimont

unread,
Jan 15, 2015, 10:43:04 AM1/15/15
to rxj...@googlegroups.com, akar...@gmail.com
u can use toList() to aggregate the results, than put the computation u intended to do with your data in the subscriber, or in a doNext/doOnEach if u have some side effects u want to isolate.
Reply all
Reply to author
Forward
0 new messages