Hello,
I'd like to process items in a Flux differently, based on some discriminator. I could use Flux.groupBy and the merge back the resulting Fluxes, but I want to preserve order. Is there a standard way of doing this?
I didn't find any, so I came up with the following "scatterGather" implementation :
public class ReactiveUtils {
public static <I, D, O> Flux<O> scatterGather(Publisher<I> input, Function<I, D> discriminator, Function<D, Function<Flux<I>, Flux<O>>> actions, int window) {
return Flux.concat(
Flux.from(input)
.window(window)
.map(inputWindow -> {
Flux<GroupedFlux<D, IndexedItem<I>>> groupedInput = indexedFlux(inputWindow).groupBy(indexedItem -> discriminator.apply(indexedItem.getItem()));
return groupedInput.flatMap(groupFlux -> {
Flux<IndexedItem<I>> cachedGroupedFlux = groupFlux.cache();
Flux<Long> indexFlux = cachedGroupedFlux.map(IndexedItem::getIndex);
Flux<I> inputFlux = cachedGroupedFlux.map(IndexedItem::getItem).cache();
Flux<O> outputFlux = Flux.from(actions.apply(groupFlux.key()).apply(inputFlux));
return indexFlux.zipWith(outputFlux, (index, outputItem) -> new IndexedItem<>(outputItem, index));
})
.collectSortedList(Comparator.comparing(IndexedItem::getIndex))
.flatMapMany(Flux::fromIterable)
.map(IndexedItem::getItem);
}));
}
public static <I> Flux<IndexedItem<I>> indexedFlux(Flux<I> flux) {
LongAdder longAdder = new LongAdder();
return flux.map(item -> {
long index = longAdder.longValue();
longAdder.increment();
return new IndexedItem<>(item, index);
});
}
public static class IndexedItem<T> {
private final T item;
private final long index;
IndexedItem(T item, long index) {
this.item = item;
this.index = index;
}
public T getItem() {
return item;
}
public long getIndex() {
return index;
}
}
}
Is this the correct way to implement this feature? Thanks for your help!
Antoine