Hello
The following code try to process in parallel "expansion" objects from a list of "expansions". We cannot do that because we believe there is a possible race condition in project reactor.
For each "expansion", it applies some transformation job on each element of a second list of "item".
Each "item" is independent of others and expansion transformations are independent between them. Therefore, make sense to transform each item in a different thread for each expansion applied. Because all this work is independent, there is not point to map it like a map/reduce process.
So, if the list of items has 10 elements:
with a list of 1 expansion: 10 elem x 1 exp = 10 transformation jobs will be generated and sent to the cachedDispatcher().
with a list of 2 expansions: 10 elem x 2 exp = 20 transformation jobs will be generated and sent to the cachedDispatcher().
with a list of 3 expansions: 10 elem x 3 exp = 30 transformation jobs will be generated and sent to the cachedDispatcher(). <- it does not work!!
with a list of 4 expansions: 10 elem x 4 exp = 40 transformation jobs will be generated and sent to the cachedDispatcher().
It works ok if we do not use a cachedDispatcher() for expansions. But that mean job generations will not run in parallel.
If I uncomment the line marked with "/////\\\///", the case with 3 expansions fails and only 2 expansions are processed generating 20 transformation jobs. The expansion that is not processed is the first in the list.
It works as expected with any number of expansions, except 3 (1 expansion is lost) running in an I7 notebook with 8 reported cores.
The code is more or less like this one:
Stream<Expansion> expansionStream = Streams
.from(expansionProvider.getList())
.dispatchOn(Environment.cachedDispatcher())
.observeComplete(completed -> {
System.out.println("All the expansion works were launched. / " + Thread.currentThread().getName());
});
expansionStream.observe(expansion -> {
// register expansion
Streams
.just(expansion)
/////\\\///.dispatchOn(Environment.cachedDispatcher()) if we enable this, there is a race condition happening (projectreactor bug?)
.observeComplete(completed -> {
System.out.println("Expansion completed. / " + Thread.currentThread().getName());
})
.observe(exp -> {
try {
System.out.println("Launching work for expanding " + expansion + " / " + Thread.currentThread().getName());
Thread.sleep(100);
Stream<MyItem> expansionCollectionStream = Streams
.from(items)
.dispatchOn(Environment.cachedDispatcher());
expansionCollectionStream
.observe(myItem -> {
Streams.just(myItem)
.dispatchOn(Environment.cachedDispatcher())
.observeComplete(completed -> {
System.out.println("=== Expansion completed. / " + Thread.currentThread().getName());
})
.observe(anItem -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\tb) Adding to the item " + myItem.getId() +
" the expanded item " +
expansion + " / " + Thread.currentThread().getName());
})
.consume(anItem -> {
System.out.println("# Deregister expansionCollectionStream " + count.get() + " / " + Thread.currentThread().getName());
});
})
.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("(end of launching works for expanding)" + " / " + Thread.currentThread().getName());
})
.consume(exp -> {
System.out.println("(confirming end of launching works for expanding)" + " / " + Thread.currentThread().getName());
});
}).consume();
Besides of the workaround of iterating the list of expansion sequentially, we are concerned about this concurrent problem could happen with different number of expansions and items.
Pablo