Help with new Observables from flatMap operation and execution threads

176 views
Skip to first unread message

Todd Nine

unread,
Mar 25, 2014, 3:01:33 PM3/25/14
to rxj...@googlegroups.com
Hey guys,
  I have a quick question about the threading in Rx and creating new Observables.  A lot of my Observables are Cassandra based Observables.  Since there's not (yet) an async Astayanx client, these Observables are created with the scheduler of Schedulers.io().  This works well, for instance, here is a piece of code I have to create an Observable from our customer iterators.


   
    private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {

        return Observable.create( new ObservableIterator<MarkedEdge>() {
            @Override
            protected Iterator<MarkedEdge> getIterator() {
                return edgeSerialization.getEdgesFromSource( scope, search );
            }
        } ).subscribeOn( Schedulers.io() );
    }

We load in pages sizes that are configurable, and emit as long as the source iterator to Cassandra emits edges.  This works well, however it's not clear to me how schedulers are inherited, if at all in sub tasks.  For instance, imagine I create the following observables.



//has it's own I/O thread
Observable<MarkedEdge> sourceEdges = loadEdgesFromSource(scope, search);

//has it's own I/O thread
Observable<MarkedEdge> targetEdges = loadEdgesToTarget(scope, search);

Observable.merge(sourceEdges, targetEdges).subscribe(new Action1<MarkedEdge>(){


    @Override
    public void call( final MarkedEdge markedEdge ) {
       log.info("Edge received {}", markedEdge);
     }

});



What thread will my subscription actions be invoked on?  I'm assuming a processing thread (not an I/O thread).  In this example, I would have 3 threads running correct?  1 thread for sourceEdges, 1 for targetEdges, and 1 for the subscription of the merge?   Under heavy load I'm getting rejected exceptions from the io scheduler, so I think I'm hammering it with more Observable io threads that I initially believed I was.  If I don't subscribe merge operations (concat, merge etc) on an I/O thread, won't my processing threads then be blocked by the I/O observable emitting the data?  What's the best place for me to being looking through the source to better understand how schedulers are used with subscriptions and merge operators?

Thanks,
Todd

Ben Christensen

unread,
Mar 25, 2014, 3:28:37 PM3/25/14
to Todd Nine, rxj...@googlegroups.com
When you merge them together it will ensure only 1 onNext at a time is being called, but whichever Observable emits that onNext will be the thread it is sent on.

Thus, you should receive events in your final Subscriber from as many threads as you have Observables being merged. 

However, the Rx contract ensures that even if there are multiple threads involved, they will be called sequentially (memory visibility, thread-safety etc is okay … just don’t rely on thread locals).

Whenever an operator may use a Scheduler there is an overload for you to pass in your own Scheduler. Thus, the merge/concat/etc combinatorial operators do not add concurrency, they allow the threads from the source to push the events through.

So you’ll have 2 threads, not 3 involved. If you wanted to move it to another thread, then you could use observeOn after the merge to move the processing to a 3rd thread.

Take a look at OperatorSubscribeOn, OperatorObserveOn and OperatorMerge if you want to understand these at the implementation level.

-- 
Ben Christensen
+1.310.782.5511  @benjchristensen

Todd Nine

unread,
Mar 25, 2014, 5:21:53 PM3/25/14
to rxj...@googlegroups.com, Todd Nine
Thanks for the clarification Ben, you'e just made my life much easier!
Reply all
Reply to author
Forward
0 new messages