Hey guys,
I have a quick question about the threading in Rx and creating new Observables. A lot of my Observables are Cassandra based Observables. Since there's not (yet) an async Astayanx client, these Observables are created with the scheduler of Schedulers.io(). This works well, for instance, here is a piece of code I have to create an Observable from our customer iterators.
private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
return Observable.create( new ObservableIterator<MarkedEdge>() {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSource( scope, search );
}
} ).subscribeOn( Schedulers.io() );
}
We load in pages sizes that are configurable, and emit as long as the source iterator to Cassandra emits edges. This works well, however it's not clear to me how schedulers are inherited, if at all in sub tasks. For instance, imagine I create the following observables.
//has it's own I/O thread
Observable<MarkedEdge> sourceEdges = loadEdgesFromSource(scope, search);
//has it's own I/O thread
Observable<MarkedEdge> targetEdges = loadEdgesToTarget(scope, search);
Observable.merge(sourceEdges, targetEdges).subscribe(new Action1<MarkedEdge>(){
@Override
public void call( final MarkedEdge markedEdge ) {
log.info("Edge received {}", markedEdge);
}
});
What thread will my subscription actions be invoked on? I'm assuming a processing thread (not an I/O thread). In this example, I would have 3 threads running correct? 1 thread for sourceEdges, 1 for targetEdges, and 1 for the subscription of the merge? Under heavy load I'm getting rejected exceptions from the io scheduler, so I think I'm hammering it with more Observable io threads that I initially believed I was. If I don't subscribe merge operations (concat, merge etc) on an I/O thread, won't my processing threads then be blocked by the I/O observable emitting the data? What's the best place for me to being looking through the source to better understand how schedulers are used with subscriptions and merge operators?
Thanks,
Todd