Data lost in cold stream

64 views
Skip to first unread message

Markus Paaso

unread,
Sep 18, 2015, 11:25:35 AM9/18/15
to reactor-framework
Hi.

I am trying to use reactor streams to process huge data set.
I am creating a cold stream using Iterable containing hundreds of thousands elements as input.
Then the stream is partitioned to enable parallel processing.
But I am not getting all elements consumed. Am I doing something wrong?
Below is example code and failing assertion.


 def input = [:]
 def rand = new Random()
 for(i in 1..50000) {
  input[i] = []
  for(j in 1..20)
   input[i] << rand.nextFloat()
 }
 DispatcherSupplier supplier = Environment.newCachedDispatchers(10);
 AtomicInteger consumed = new AtomicInteger(0)
 def stream = Streams
  .from(input.entrySet())
  .partition(8)
  .flatMap({ Stream stream ->
    stream.dispatchOn(supplier.get()).map({ e -> [e.key, e.value.sum()]))
   })
 stream.consume({consumed.getAndIncrement()})
 long sm = System.currentTimeMillis()
 def resultsSize = stream.toList().await().size()
 assert resultsSize == 50000
 assert consumed.get() == 50000

Output:

org.codehaus.groovy.runtime.powerassert.PowerAssertionError:
assert resultsSize == to
       |           |  |
       49600       |  50000
                   false

Markus Paaso

unread,
Sep 21, 2015, 10:07:41 AM9/21/15
to reactor-framework
I found a weird workaround for the problem:
Somehow adding a mapper that does nothing before .partition() fixes the problem.

Workaround:

def input = [:]
def rand = new Random()
for(i in 1..50000) {
 input[i] = []
 for(j in 1..20)
  input[i] << rand.nextFloat()
}
DispatcherSupplier supplier = Environment.newCachedDispatchers(10);
AtomicInteger consumed = new AtomicInteger(0)
def stream = Streams
  .from(input.entrySet())
   
.map({it})
   .partition(8)
  .flatMap({ Stream stream ->
    stream.dispatchOn(supplier.get()).map({ e -> [e.key, e.value.sum()]))
  })
stream.consume({consumed.getAndIncrement()})
def resultsSize = stream.toList().await().size()
assert resultsSize == 50000
assert consumed.get() == 50000

Markus Paaso

unread,
Sep 21, 2015, 10:53:38 AM9/21/15
to reactor-framework
Another workaround is to add same do-nothing-mapper before dispatchOn():

...
   
.flatMap({ Stream stream ->
    stream.map({it}).dispatchOn(supplier.get()).map({ e -> [e.key, e.value.sum()]))
   })
...

Stephane Maldini

unread,
Sep 28, 2015, 7:27:00 AM9/28/15
to Markus Paaso, reactor-framework
Thanks for the feedback, we have a test case for that now and reactor "2.1" is quite improving things around flatMap :)

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Reactor Project Lead, Spring Engineering | London | Pivotal

Reply all
Reply to author
Forward
0 new messages