How to wait for async Observable to complete

8,787 views
Skip to first unread message

Andrea Chiodoni

unread,
Jun 18, 2014, 10:58:06 AM6/18/14
to rxj...@googlegroups.com

Hi, I'm trying to build a sample using rxjava. The sample should orchestrate a ReactiveWareService and a ReactiveReviewService retruning a WareAndReview composite.

ReactiveWareService
public Observable<Ware> findWares() {
    return Observable.from(wareService.findWares());
}

ReactiveReviewService
public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
    try {
 //reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!
 List<Review> reviews = reviewService.findReviewsByItem(item);
 reviews.forEach(observer::onNext);
 observer.onCompleted();
    } catch (Exception e) {
 observer.onError(e);
    }
}));
}

public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();

wareService.findWares()
 .map(WareAndReview::new)
 .subscribe(wr -> {
      wareAndReviews.add(wr);
      //Async!!!!
      reviewService.findReviewsByItem(wr.getWare().getItem())
       .subscribe(wr::addReview,
        throwable -> System.out.println("Error while trying to find reviews for " + wr)
       );
  }
 );

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion!
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {}

return wareAndReviews;
}


Given the fact I don't want to return an Observable<WareAndReview>, how can I wait for async Observable (findReviewsByItem) to complete (avoiding the uggly Thread.sleep)?

Dave Ray

unread,
Jun 18, 2014, 11:20:55 AM6/18/14
to Andrea Chiodoni, rxj...@googlegroups.com
Hey,

You can use a blocking observable to escape from rx-land back into synchronous-land. I'm not exactly sure where it fits in your code, but the basic patten is:

  List result = observable.toList().toBlockingObservable().single();

There are other variants in BlockingObservable depending on your needs.

Cheers,
Dave

Jeremy Jongsma

unread,
Jun 18, 2014, 11:29:16 AM6/18/14
to Andrea Chiodoni, rxj...@googlegroups.com
In one chain (possibly full of errors, but the flow should work):

List<WareAndReview> wareAndReviews = wareService.findWares()
  .map(WareAndReview::new)
  .flatMap(wr -> {
    return reviewService.findReviewsByItem(wr.getWare().getItem())
      .map(rev -> {
        wr.addReview(rev)
        return wr;
      });
  })
  .toList()
  .toBlockingObservable()
  .single();

Andrea Chiodoni

unread,
Jun 18, 2014, 4:50:29 PM6/18/14
to rxj...@googlegroups.com
Thanks Dave, Thanks Jeremy,

I could make it work with

        List<WareAndReview> wareAndReviews = wareService.findWares()
                .map(WareAndReview::new)
                .flatMap(wr -> {
                    return reviewService.findReviewsByItem(wr.getWare().getItem())
                            .onExceptionResumeNext(Observable.<Review>empty())
                            .map(r -> {
                                wr.addReview(r);
                                return wr;
                            });
                })
                .distinct()
                .toList()
                .toBlocking()
                .single();

Jeremy Jongsma

unread,
Jun 18, 2014, 5:37:00 PM6/18/14
to Andrea Chiodoni, rxj...@googlegroups.com
Ah, multiple reviews. Might be simpler to just process all reviews as a list instead of involving distinct() later on?

.toList()
.map(reviews -> {
  for (Review r : reviews)
    wr.addReview(r);
  return wr;
});

Andrea Chiodoni

unread,
Jun 19, 2014, 2:16:14 AM6/19/14
to rxj...@googlegroups.com, andrea....@gmail.com
Ciao Jeremy,
 
you are right, this is also working
 
        return wareService.findWares()

                .map(WareAndReview::new)
                .flatMap(wr -> {
                    return reviewService.findReviewsByItem(wr.getWare().getItem())
                            .onExceptionResumeNext(Observable.<Review>empty())
                            .toList()
                            .map(reviews -> {
                                wr.addReviews(reviews);
                                return wr;
                            });
                })
                .toList()
                .toBlocking()
                .single();
Reply all
Reply to author
Forward
0 new messages