Scatter gather preserving order

22 views
Skip to first unread message

Antoine Baudoux, Taktik

unread,
Jan 24, 2018, 9:01:40 AM1/24/18
to reactor-framework
Hello, 

I'd like to process items in a Flux differently, based on some discriminator. I could use Flux.groupBy and the merge back the resulting Fluxes, but I want to preserve order. Is there a standard way of doing this? 

I didn't find any, so I came up with the following "scatterGather" implementation :


public class ReactiveUtils {
   public static <I, D, O> Flux<O> scatterGather(Publisher<I> input, Function<I, D> discriminator, Function<D, Function<Flux<I>, Flux<O>>> actions, int window) {
      return Flux.concat(
            Flux.from(input)
                  .window(window)
                  .map(inputWindow -> {
                     Flux<GroupedFlux<D, IndexedItem<I>>> groupedInput = indexedFlux(inputWindow).groupBy(indexedItem -> discriminator.apply(indexedItem.getItem()));
                     return groupedInput.flatMap(groupFlux -> {
                        Flux<IndexedItem<I>> cachedGroupedFlux = groupFlux.cache();
                        Flux<Long> indexFlux = cachedGroupedFlux.map(IndexedItem::getIndex);
                        Flux<I> inputFlux = cachedGroupedFlux.map(IndexedItem::getItem).cache();
                        Flux<O> outputFlux = Flux.from(actions.apply(groupFlux.key()).apply(inputFlux));
                        return indexFlux.zipWith(outputFlux, (index, outputItem) -> new IndexedItem<>(outputItem, index));
                     })
                           .collectSortedList(Comparator.comparing(IndexedItem::getIndex))
                           .flatMapMany(Flux::fromIterable)
                           .map(IndexedItem::getItem);
                  }));
   }

   public static <I> Flux<IndexedItem<I>> indexedFlux(Flux<I> flux) {
      LongAdder longAdder = new LongAdder();
      return flux.map(item -> {
         long index = longAdder.longValue();
         longAdder.increment();
         return new IndexedItem<>(item, index);
      });
   }

   public static class IndexedItem<T> {
      private final T item;
      private final long index;

      IndexedItem(T item, long index) {
         this.item = item;
         this.index = index;
      }

      public T getItem() {
         return item;
      }

      public long getIndex() {
         return index;
      }
   }
}


Is this the correct way to implement this feature? Thanks for your help!

Antoine

Reply all
Reply to author
Forward
0 new messages