Equivalent of a "fork" in Rx Java

610 views
Skip to first unread message

Todd Nine

unread,
Dec 10, 2013, 10:52:39 PM12/10/13
to rxj...@googlegroups.com
Hi guys,
  I have a use case I'm not quite sure how to do in Rx and would like some advice.  The case is relatively straight forward.  I have an object (Entity) I read from Cassandra which is then wrapped in an Observable using Observable.just(Entity).  This observable then goes through several functions, where each function may map the observable to a new type or perform some analysis on the emitted value.  

If at any point one of these functions determines that the entity is in a "stale" stage and needs lazy repaired (this can happen if a write starts and a node crashes after the initial write has been committed, but before our graph insert is complete), I want to then execute several functions that will asynchronously repair the graph state for this entity.  I envision doing this in the following way



1) Create a completely new Observable instance within the function that recognizes the entity is stale by wrapping the existing state in an "Observable.just()" call.  This new observable will emit the single entity via onNext(), then immediately call onComplete().

2) Start the processing of this new observable in a new thread.

How to do I do 2) without blocking the thread that's currently executing 1)?  I'm assuming I need to specify the scheduler when creating the functions.  Is this all I would need to do?

Thanks,
Todd

Shixiong Zhu

unread,
Dec 11, 2013, 1:35:58 AM12/11/13
to Todd Nine, rxj...@googlegroups.com
You can use Scheduler and PublishSubject to implement it. Here is an example:

    public Observable<Integer> foo(final int i) {
        if (i < 0) {
            final PublishSubject<Integer> subject = PublishSubject.create();
            // You can also use Schedulers.threadPoolForComputation();
            Schedulers.threadPoolForIO().schedule(new Action0() {
                @Override
                public void call() {
                    try {
                        int newI = -i;
                        Thread.sleep(4000);
                        subject.onNext(newI);
                        subject.onCompleted();
                    } catch (Throwable e) {
                        subject.onError(e);
                    }
                }
            });
            return subject;
        } else {
            return Observable.just(i);
        }
    }

In the incoming release 0.16.0 (https://github.com/Netflix/RxJava/issues/601), there will be a more convenient operator: Start.


Best Regards,

Shixiong Zhu


2013/12/11 Todd Nine <tn...@apigee.com>
Reply all
Reply to author
Forward
0 new messages