Reuse of actions in different stream instances

33 views
Skip to first unread message

Daniel Feist

unread,
Oct 2, 2015, 7:26:43 PM10/2/15
to reactor-framework
Hi,

So I've been playing around with reactor and attempted to put together a custom Action to implement the  typical scatter-gather pattern.  One of the goals I was trying to achieve when doing it though was the idea of creating/defining actions once and then reusing them.  For example, if I need to append "a" in multiple places in an application (in different streams) then ideally i'd should be able to instantiate a AppendAAction and reuse this where required.

This meant that rather than hardcode some 'map()' functions to represent the paths in the scatter-gather I instead attempted to compose Processors as you can see below.

The issue is that this doesn't not work as expected because actions are stateful and contain with references to upstream/downstream subscriptions which are context specific.  

So my question is:
- Is there another approach for achieving the same thing, that doesn't have this problem.
- Is there a way of wrapping a reusable processor every time it's used such that the wrapper has this state only.
- Wondering if there is a different approach that could be used that doesn't require wrapping on resuse....not given it much thought yet though..

thanks!

Dan


------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Processor a = new AppendAction("a");
Processor b = new AppendAction("b");
Processor scatterGather = new ScatterGather(new Processor[]{a,b});
Streams.range(0,3).process(scatterGather).consume(o -> System.out.println("RESULT: "+o));


/**
Scatter-Gather Action that scatters incoming message (in) to n routes, before gathering the results into a List and continuing stream processing with the result.
**/
public class ScatterGatherAction extends Action<Message, Message>

    private Processor[] routes;

    public ScatterGatherAction(Processor[[] routes){
        this.routes = routes;
    }
     
    @Override
    protected void doNext(Message in)
    {
        Streams.from(routes)
                .flatMap(route -> Streams.just(in)
                        .process(route)
                        .subscribeOn(Environment.cachedDispatcher()))
                .buffer()
                .consume(o -> broadcastNext(new Message(o)));
    }
}

Daniel Feist

unread,
Oct 5, 2015, 10:53:18 AM10/5/15
to reactor-framework
Here is a much simpler example that shows this issue:

Action<Long, Long> addOne = new MapAction<>(s -> s + 1) ;
Streams.range(0, 2).flatMap(o -> Streams.just(o).lift(() -> addOne)).consume(s -> System.out.println(s));

The output i'd expect is "1,2,3" but instead the output is just "1".  If instead of referencing the 'addOne' action, the MapAction is created each time the output is as expected.

Interestingly with RxJava, where there is no subscription state in the Operators, the below snippet behaves as expected.

Observable.Operator<Integer,Integer> addOne = new OperatorMap<>(s -> s + 1);
Observable.range(0, 3).lift(addOne).subscribe(s -> System.out.println(s));

For simple cases, and when primarily just using the Streams Fluent API this isn't an issue, but as soon as you attempt to do anything more modular/reusable this becomes an issue as you start having to create factories for all of your custom actions.

regards,
Dan

Daniel Feist

unread,
Oct 5, 2015, 11:06:11 AM10/5/15
to reactor-framework
Oops, wrong RxJava snippet.  This is the correct one which also outputs "1,2,3".

Observable.Operator<Integer,Integer> addOne = new OperatorMap<>(s -> s + 1);
Observable.range(0, 3).flatMap(s -> Observable.just(s).lift(addOne)).subscribe(s -> System.out.println(s));

Dan
Reply all
Reply to author
Forward
0 new messages