def input = [:] def rand = new Random() for(i in 1..50000) { input[i] = [] for(j in 1..20) input[i] << rand.nextFloat() } DispatcherSupplier supplier = Environment.newCachedDispatchers(10); AtomicInteger consumed = new AtomicInteger(0) def stream = Streams .from(input.entrySet()) .partition(8) .flatMap({ Stream stream -> stream.dispatchOn(supplier.get()).map({ e -> [e.key, e.value.sum()])) }) stream.consume({consumed.getAndIncrement()}) long sm = System.currentTimeMillis() def resultsSize = stream.toList().await().size() assert resultsSize == 50000 assert consumed.get() == 50000
def input = [:] def rand = new Random() for(i in 1..50000) {
input[i] = [] for(j in 1..20) input[i] << rand.nextFloat() } DispatcherSupplier supplier = Environment.newCachedDispatchers(10); AtomicInteger consumed = new AtomicInteger(0) def stream = Streams
.from(input.entrySet())
.map({it})
.partition(8) .flatMap({ Stream stream -> stream.dispatchOn(supplier.get()).map({ e -> [e.key, e.value.sum()])) }) stream.consume({consumed.getAndIncrement()})
def resultsSize = stream.toList().await().size() assert resultsSize == 50000assert consumed.get() == 50000
...
.flatMap({ Stream stream -> stream.map({it}).dispatchOn(supplier.get()).map({ e -> [e.key, e.value.sum()])) })...
--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.