Best practices for handling blocking IO requests?

271 views
Skip to first unread message

Joshua Chaitin-Pollak

unread,
Jan 23, 2015, 10:12:21 AM1/23/15
to reactor-...@googlegroups.com
As I understand it, you get optimal performance from Reactor and Streams when you minimize blocking IO, but IO is always inevitable at some point. If you have a sequence of operations, and in the middle of them one requires data from another source, what is the best practice to handle that so you can optimize throughput? Specifically, lets say you have a stream like this:

stream
.map(o -> doSomething(o))
.map(o -> mergeWithRemoteData(o))
.map(o -> persistUpdatedObject(o))

That is a bit of a contrived example, but should we be using something like the following?

stream().partition(5).dispatchOn(environment.getDispatcher(Environment.THREAD_POOL))

Is getCachedDispatcher() better? Or should we be benchmarking to decide?

Jon Brisbin

unread,
Jan 23, 2015, 10:47:42 AM1/23/15
to Joshua Chaitin-Pollak, reactor-...@googlegroups.com
I'm going to put something like this in the projectreactor.io/docs section (I have a place for it, but haven't filled it out yet), but here's about the simplest fork/join worker pool I could come up with:

    @Test
    public void simpleForkJoinPool() throws InterruptedException {
        List<String> ids = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8");

        Streams.from(ids)
               .dispatchOn(Environment.sharedDispatcher())
               .partition()
               .flatMap(stream -> stream.dispatchOn(cachedDispatcher())
                                        .map(s -> findOne(s)))
               .consume(t -> System.out.println(Thread.currentThread() + ", worker=" + t));

        Thread.sleep(500);
    }

This takes the "ids" as input (though you could use a Broadcaster instead of an IterableStream), dispatches on the shared RingBuffer, partitions the work up on 1 Stream per CPU, puts the work on an assigned cachedDispatcher (a RingBuffer from a pool in the Environment), and then dispatches all results of the "findOne(s)" calls (in this case a helper method that returns Thread.currentThread().toString()) back onto the shared RingBuffer.

Or to shorten the explanation: fork to worker threads and join back to a shared thread the results of the tasks.

--

Thanks!

Jon Brisbin
Reactor Project Lead
http://projectreactor.io | @j_brisbin @ProjectReactor



Joshua Chaitin-Pollak wrote:
--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jon Brisbin

unread,
Jan 23, 2015, 12:22:38 PM1/23/15
to Joshua Chaitin-Pollak, Reactor Framework
In general Streams need an ordered Dispatcher to perform their work. The ThreadPoolExecutorDispatcher is inherently unordered so is extremely limited in its application as far as a Stream is concerned. It's getting to the point now that there's almost no reason to still keep it around and it would be less confusing to just delete it entirely. We may do that in a future version. At the least I think we'll put @Deprecated on it.

There is a subtle difference between your and my examples. You're correct in intuiting that the .dispatchOn(sharedDispatcher()) will cause the outermost Stream to act as a "join", which will cause results from worker threads to all be funneled onto the same, shared RingBuffer thread. If you leave that topmost .dispatchOn() off, you'll see that the results are all funneled to the Dispatcher which happens to be the last one to process a task that receives the onComplete. Comment it out and run it and you'll see what I mean.

You need to call .dispatchOn() on the streams you receive in the .flatMap() call because you'll get N streams there, 1 per CPU by default. That's the only way to assign separate dispatchers to all the different Streams. By assigning the shared RingBuffer as the topmost Dispatcher, you get all events from worker threads funneled into the same, specific thread, rather than all funneling onto whatever random thread happens to run the last task.

If you don't want to use the default cachedDispatcher() pool (since there's only 1 per CPU of them by default), then you can call Environment.newCachedDispatchers(poolSize, name) and create a whole bunch of them (100, 200, whatever suits your use case). These will be accessible through the Environment later (you get a DispatcherSupplier back from that call but you don't have to keep a reference to it...just call cachedDispatchers(name) later). Then replace the Environment.cachedDispatcher() call with dispatcherSupplier.get().


--

Thanks!

Jon Brisbin
Reactor Project Lead
http://projectreactor.io | @j_brisbin @ProjectReactor



Joshua Chaitin-Pollak wrote:
Your example makes a lot of sense. Is there a difference between dispatchOn().partition() and .partition().dispatchOn()?

My code currently has this:

Streams.broadcast()
     .partition(5)
     .dispatchOn(environment.getCachedDispatcher())
     .flatMap(stream -> stream.map());

So, as this is written, am I right in understanding that these partitioned streams would actually be executing on the same (single) cachedDispatcher, instead of on parallel ones?

In your example, this:

        Streams.from(ids)
               .dispatchOn(Environment.sharedDispatcher())

causes the 'outer' stream to execute asynchronously correct? 

If you removed the dispatchOn(), the 'outer' stream would run synchronously with the main thread, but still fork its 'workers' within the flatMap, right? 

Thanks, I'm just trying to make sure I've got it all understood correctly.

BTW: Why prefer cachedDispatcher() over the THREAD_POOL dispatcher?
--
Joshua Chaitin-Pollak
Chief Technology Officer
Assured Labor, Inc.

Joshua Chaitin-Pollak

unread,
Jan 23, 2015, 12:57:32 PM1/23/15
to Jon Brisbin, Reactor Framework
Hum. I think I get it... Just one last question to clarify, I tried removing the outer dispatchOn(), and saw that consume() ran on dispatcherGroup threads. When you use the outer dispatchOn(), it runs consistently on the shared thread on a consistent thread.

Since in either case the Stream is running asynchronously from the calling thread, is there any reason to have the 'join'? All of the data will end up getting processed eventually, and even with the dispatchOn() call, the order is not guaranteed. So it seems to me there really isn't much benefit to getting the outer stream to finish processing on a consistent thread.

Jon Brisbin

unread,
Jan 23, 2015, 2:51:18 PM1/23/15
to Joshua Chaitin-Pollak, Reactor Framework
It may or may not be important that the downstream of a fork runs on one of the worker threads. A thread is a thread is a thread, so in that respect, it doesn't make a difference.

Jernej Jerin

unread,
Jul 18, 2015, 6:26:42 AM7/18/15
to reactor-...@googlegroups.com, jo...@assuredlabor.com
I am looking at the Streams partitioning example in the docs and I would like to know if it would be possible to maintain the order of the elements when joining back? For example if we generate elements from 1 to 10 and partition to 2 streams, that after joining the order should be maintained.
Reply all
Reply to author
Forward
0 new messages