Hi,
So I've been playing around with reactor and attempted to put together a custom Action to implement the typical scatter-gather pattern. One of the goals I was trying to achieve when doing it though was the idea of creating/defining actions once and then reusing them. For example, if I need to append "a" in multiple places in an application (in different streams) then ideally i'd should be able to instantiate a AppendAAction and reuse this where required.
This meant that rather than hardcode some 'map()' functions to represent the paths in the scatter-gather I instead attempted to compose Processors as you can see below.
The issue is that this doesn't not work as expected because actions are stateful and contain with references to upstream/downstream subscriptions which are context specific.
So my question is:
- Is there another approach for achieving the same thing, that doesn't have this problem.
- Is there a way of wrapping a reusable processor every time it's used such that the wrapper has this state only.
- Wondering if there is a different approach that could be used that doesn't require wrapping on resuse....not given it much thought yet though..
thanks!
Dan
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Processor a = new AppendAction("a");
Processor b = new AppendAction("b");
Processor scatterGather = new ScatterGather(new Processor[]{a,b});
Streams.range(0,3).process(scatterGather).consume(o -> System.out.println("RESULT: "+o));
/**
Scatter-Gather Action that scatters incoming message (in) to n routes, before gathering the results into a List and continuing stream processing with the result.
**/
public class ScatterGatherAction extends Action<Message, Message>
private Processor[] routes;
public ScatterGatherAction(Processor[[] routes){
this.routes = routes;
}
@Override
protected void doNext(Message in)
{
Streams.from(routes)
.flatMap(route -> Streams.just(in)
.process(route)
.subscribeOn(Environment.cachedDispatcher()))
.buffer()
.consume(o -> broadcastNext(new Message(o)));
}
}