race condition in version 2.0.0.M2?

40 views
Skip to first unread message

Pablo Alcaraz

unread,
Jan 22, 2015, 2:40:39 PM1/22/15
to reactor-...@googlegroups.com
Hello

The following code try to process in parallel "expansion" objects from a list of "expansions". We cannot do that because we believe there is a possible race condition in project reactor.

For each "expansion", it applies some transformation job on each element of a second list of "item". 

Each "item" is independent of others and expansion transformations are independent between them. Therefore, make sense to transform each item in a different thread for each expansion applied. Because all this work is independent, there is not point to map it like a map/reduce process.

So, if the list of items has 10 elements:

with a list of 1 expansion: 10 elem x 1 exp = 10 transformation jobs will be generated and sent to the cachedDispatcher().
with a list of 2 expansions: 10 elem x 2 exp = 20 transformation jobs will be generated and sent to the cachedDispatcher().
with a list of 3 expansions: 10 elem x 3 exp = 30 transformation jobs will be generated and sent to the cachedDispatcher(). <- it does not work!!
with a list of 4 expansions: 10 elem x 4 exp = 40 transformation jobs will be generated and sent to the cachedDispatcher().

It works ok if we do not use a cachedDispatcher() for expansions. But that mean job generations will not run in parallel.

If I uncomment the line marked with "/////\\\///", the case with 3 expansions fails and only 2 expansions are processed generating 20 transformation jobs. The expansion that is not processed is the first in the list.

It works as expected with any number of expansions, except 3 (1 expansion is lost) running in an I7 notebook with 8 reported cores.

The code is more or less like this one:

            Stream<Expansion> expansionStream = Streams
               
.from(expansionProvider.getList())
               
.dispatchOn(Environment.cachedDispatcher())
               
.observeComplete(completed -> {
                   
System.out.println("All the expansion works were launched. / " + Thread.currentThread().getName());
               
});


            expansionStream
.observe(expansion -> {
               
// register expansion
               
Streams
                   
.just(expansion)
                   
/////\\\///.dispatchOn(Environment.cachedDispatcher()) if we enable this, there is a race condition happening (projectreactor bug?)
                   
.observeComplete(completed -> {
                       
System.out.println("Expansion completed. / " + Thread.currentThread().getName());
                   
})
                   
.observe(exp -> {
                       
try {
                           
System.out.println("Launching work for expanding " + expansion + " / " + Thread.currentThread().getName());
                           
Thread.sleep(100);
                           
Stream<MyItem> expansionCollectionStream = Streams
                               
.from(items)
                               
.dispatchOn(Environment.cachedDispatcher());
                            expansionCollectionStream
                               
.observe(myItem -> {
                                   
Streams.just(myItem)
                                           
.dispatchOn(Environment.cachedDispatcher())
                                           
.observeComplete(completed -> {
                                               
System.out.println("=== Expansion completed. / " + Thread.currentThread().getName());
                                           
})
                                           
.observe(anItem -> {
                                               
try {
                                                   
Thread.sleep(100);
                                               
} catch (InterruptedException e) {
                                                    e
.printStackTrace();
                                               
}
                                               
System.out.println("\tb) Adding to the item " + myItem.getId() +
                                                                   
" the expanded item " +
                                                                   expansion
+ " / " + Thread.currentThread().getName());

                                           
})
                                           
.consume(anItem -> {
                                               
System.out.println("# Deregister expansionCollectionStream " + count.get() + " / " + Thread.currentThread().getName());
                                           
});
                                   
})
                               
.consume();
                       
} catch (InterruptedException e) {
                            e
.printStackTrace();
                       
}
                       
System.out.println("(end of launching works for expanding)" + " / " + Thread.currentThread().getName());
                   
})
                   
.consume(exp -> {
                       
System.out.println("(confirming end of launching works for expanding)" + " / " + Thread.currentThread().getName());
                   
});
           
}).consume();






Besides of the workaround of iterating the list of expansion sequentially, we are concerned about this concurrent problem could happen with different number of expansions and items.

Pablo


Jon Brisbin

unread,
Jan 22, 2015, 3:25:06 PM1/22/15
to Pablo Alcaraz, reactor-...@googlegroups.com
Truth be told I had a little trouble following what was supposed to happen in your example code. Here's a shorter version I tried to put together that I *think* approaches what you're trying to do.

Notice that I used .groupBy(s -> s) first, which gives me one Stream per unique element in the initial List<String> I started with:

        Streams.from(startWith)
               .groupBy(s -> s)
               .consume(str -> {
                   str.dispatchOn(cachedDispatcher())
                      .observeComplete(v -> System.out.println("First expansion complete on " + Thread.currentThread()))
                      .consume(s2 -> {
                          Streams.just(s2)
                                 .dispatchOn(cachedDispatcher())
                                 .observeComplete(v -> System.out.println("Second expansion complete on " + Thread.currentThread()))
                                 .consume(s3 -> {
                                     Streams.just(s3)
                                            .dispatchOn(cachedDispatcher())
                                            .observeComplete(v -> System.out.println("Third expansion complete on " + Thread.currentThread()))
                                            .consume(s4 -> System.out.println("Expansion result: " + s4));
                                 });
                      });
               });

When I run this in the console I get output on 6 different threads, as I would expect given I started with a List of 2 elements and there are 3 separate thread hops happening.

Just keep in mind: excessive context-switching can be very expensive. Have you benchmarked it to see how much of that is really required? I ask because our testing resulted in a surprising number of times that the less concurrency we had going on the better. The only exception to that was IO, where threads were being blocked. But in CPU-intensive tasks, the bare minimum of context-switching is generally preferable.

The only way to really know is to benchmark with various levels of concurrency. We generally baseline with the single-threaded RingBufferDispatcher first, then maybe go to a parallel Stream by using .partition() (or .partition(NUM_OF_THREADS)). But take it in baby steps. I rarely was able to exceed the throughput of a 2 or 4 thread processor by using more. It can be deceptive because what I often found in microbenchmarks is that I *wasn't* maxing out the CPU with my tasks, but with context switching. In general, the fewer threads I had doing work, the higher the throughput. That held true no matter the make-up of the tasks (e.g. whether they were unrelated to one another or not).

--

Thanks!

Jon Brisbin
Reactor Project Lead
http://projectreactor.io | @j_brisbin @ProjectReactor



Pablo Alcaraz wrote:
--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Pablo Alcaraz

unread,
Jan 22, 2015, 3:54:45 PM1/22/15
to reactor-...@googlegroups.com, pa...@stormpath.com
Hello Jon,

Thanks for the answer. 
What the code tries to do is something like this:

...
ExecutorService executorService = ...
for(Expansion expansion : expansionProvider.getList()) {
   
for(Item item : listOfItems) {
        executorService
.execute(tranformProcessPool.getProcessorInstance().transform(item, expansion));
   
}

}
...

the output will be the same "item" with the expansion applied. "item" objects are thread safe. Transform processors are stateless.


Correct me if I am wrong, but all the elements in our both lists are unique so I think there is not need of groupBy() method.

I coincide with you about the context switching. 
In our case, each transformation job is just a query to a database cluster (each transformation job will contact a different database node) so they basically will wait for an answer coming from different servers. Almost no CPU process time. The expected database answer time will be around 10/20 milliseconds for each transformation job. the idea is, if we launch the queries in parallel we will have the answer in 10/20 milliseconds + OS context switch time.
If we serialize one of the list, the response time will grow to 10/20 milliseconds x number of elements.

We believe that context switch will not be an issue since one of the lest could have 25 to 100 elements and the other typically 1 to 3 elements. Each tuple will be waiting for an answer from external servers.

I tried to use .partition() but there was not way to convince it to process a job in different threads. Perhaps I need an usage example.

Besides, I would like to use a Ringbuffer if it is possible.

Pablo

Jon Brisbin

unread,
Jan 22, 2015, 4:36:56 PM1/22/15
to Pablo Alcaraz, reactor-...@googlegroups.com
If you replace "groupBy()" in my example with "partition()" you'll get 1 Stream per CPU on which you can set the dispatcher via .dispatchOn(cachedDispatcher()). I just tried it and see log output on all the threads, not just one.
Reply all
Reply to author
Forward
0 new messages