Using reactor to compose a set of related Future with fallback

47 views
Skip to first unread message

David Dawson

unread,
May 17, 2015, 10:48:32 AM5/17/15
to reactor-...@googlegroups.com
Allo.

I have a series of related Futures  that I'd like to process as a unit, with the possibility that some may need to be timed out.

So, the overall flow is :-
  • Generate Futures
  • Wait up to X for them to return. If they all return within X proceed with processing immediately
  • If reach X, then proceed with processing anyway (ideally with a default in place of each of the Future returns
Each of the Future represents a call to a different microservice. The intent is to scatter/ gather to obtain the aggregated results for a complete page render, and handle timeouts as a graceful degredation.

This will enable a very deterministic page render time when building with lots of collaborators.

How do you think would be the best way to employ Reactor to make this?

David.

--
David Dawson 
CEO
Simplicity Itself Limited
Tel +44 7866 011 256
Skype: davidadawson
david....@simplicityitself.com
http://www.simplicityitself.com

David Dawson

unread,
May 17, 2015, 4:14:15 PM5/17/15
to reactor-...@googlegroups.com
Ok, a little follow up, following my playing this afternoon :-

Take 1
Future<X> val = ...
Future<X> val2 = ...

Stream<X> s1 = Streams.defer(val)
Stream<X> s2 = Streams.defer(val2)

Streams.merge(s1,s2).consume { X ->
    //each item in turn.
}

This is useful, it lets us use reactor to manage the async behaviour, and we can release the rendering process as needed, but the aggregation process isn't managed by reactor, and so needs to be async managed as a separate process.

Take 2

Future<X> val = ...
Future<X> val2 = ...

Stream<X> s1 = Streams.defer(val)
Stream<X> s2 = Streams.defer(val2)

Streams.join(s1,s2).consume { List<X> ->
  //all the items at once..
}

This manages the aggregation/ orchestration aspect, but doesn't have an eye on the failure of the futures to return in a reasonable timeframe, if they ever do (which can't be guaranteed).

Ideally, I want the streams s1 and s2 need to use the future.get(timeout,unit).

Take 3

Reading the docs again, this is the closest I can bring it in terms of intent. (full code listing for clarity)

FutureTask<Map> val1 = new FutureTask<Map>(new Callable<Map>() {
            @Override
            public Map call() throws Exception {
                Thread.sleep(500);
                return new HashMap();
            }
        });

        FutureTask<Map> val2 = new FutureTask<Map>(new Callable<Map>() {
            @Override
            public Map call() throws Exception {
                Thread.sleep(1500);
                return new HashMap();
            }
        });

        Executor exec = Executors.newFixedThreadPool(10);
        exec.execute(val1);
        exec.execute(val2);


        Stream<List<Map>> aggregate =
                Streams.join(
                        Streams.defer(val1, 1, TimeUnit.SECONDS),
                        Streams.defer(val2, 1, TimeUnit.SECONDS)).log("aggregate");

        aggregate.consume(new Consumer<List<Map>>() {
            @Override
            public void accept(List<Map> maps) {
                System.out.println("Got " + maps.size());
            }
        });

Conceptually, we have streaming except it doesn't work. 

The timeout caused by the future val2 causes an error to be sent down the stream that kills the whole thing.

Any other ways I might try this?

David.

Stephane Maldini

unread,
May 17, 2015, 7:11:21 PM5/17/15
to David Dawson, reactor-framework
Will look at it tomorrow, but it looks like you're using a non official release, Streams.defer should be Streams.just (http://projectreactor.io/docs/reference/#_creating_streams_and_promises). Try 2.0.1.RELEASE or 2.1.0.BUILD-SNAPSHOT if you like fixes or websocket support :)

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Solutions Architect, CSO EMEA | London | Pivotal

Stephane Maldini

unread,
May 17, 2015, 7:12:32 PM5/17/15
to David Dawson, reactor-framework
Also I've started this http://projectreactor.io/docs/reference/#streams-microservice-start , you might want to look at it.

david....@simplicityitself.com

unread,
May 18, 2015, 2:22:56 AM5/18/15
to Stephane Maldini, reactor-framework
'composing multiple service calls'

I await with baited breath!

--
David Dawson
CEO 

Simplicity Itself Ltd
Tel +447866011256
From: Stephane Maldini
Sent: Monday, 18 May 2015 00:12
To: David Dawson
Cc: reactor-framework
Subject: Re: Using reactor to compose a set of related Future with fallback

Stephane Maldini

unread,
May 18, 2015, 6:02:17 AM5/18/15
to David Dawson, reactor-framework
So in Reactor (from 2.0.1 RELEASE) we have Streams.from(Future).
I think what you need is to deal with the possible timeout, this is probably more a case of error handling. Should you retry up to N times ?


If you want to ignore the error and just complete the future -> Streams.from(Future).ignoreErrors()....

Stephane Maldini

unread,
May 18, 2015, 6:03:50 AM5/18/15
to David Dawson, reactor-framework
Note that ignoreErrors takes an optional predicate argument : Streams.from(Future).ignoreErrors(Selectors.T(TimeOutException)) 
Or ignoreErrors(throwable -> throwable instanceof TimeoutException)

Stephane Maldini

unread,
May 18, 2015, 6:05:04 AM5/18/15
to David Dawson, reactor-framework
Sorry its ignoreError without 's' :)

Jon Brisbin

unread,
May 18, 2015, 8:28:42 AM5/18/15
to Stephane Maldini, David Dawson, Reactor Framework
FWIW being able to await the result of blocking calls is going to be an important part of the next Reactive generation of Spring. Waiting on service and other types of legacy calls will have to be supported so Arjen is looking into some of this, I believe.

That said, I think we might also explore something like the BarrierStream but with Futures. e.g. create a Stream from an arbitrary set of Futures that, when all are complete, send a List<Object> downstream of the results. We could have a variant that was “first Future complete” or all complete. We should also combine this with a thread pool of some sort so we can have a Thread per Future blocking and waiting and be able to optimize the calls.

The problem with FutureTask is that it gets really, really expensive if you create too many of them, so we don’t want to be submitting new jobs to a traditional Executor to block on the Future. We need a RingBuffer like capability to set Futures into which the worker thread can block on efficiently. Maybe create a RingBufferWorkProcessor<Future<?>>?

Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor

David Dawson

unread,
May 18, 2015, 8:38:20 AM5/18/15
to Jon Brisbin, Stephane Maldini, Reactor Framework
Thanks guys!

Still on M1, upgrading now :-)

@stephane. The issue really is that I want a deterministic return time. So if the data required isn't ready, then the return happens with some pre-prepared default version. The future timeout is representing that aspect. The problem with a timeout error is that it kills the stream off, and using a 'join' it means that the stream doesn't return anything at all, so it becomes all or nothing. When semantically I actually want, everything that finishes within the timeout.  I might be going about this the wrong way, but if this could be made to happen then a common task in microservice development can be solved entirely in Reactor, multi-service orchestration.

@Jon, the Future instances we're really using aren't FutureTask based, that's just demo code to show what I'm talking about. Instead they are a view onto an event oriented communication over some net connection. So no thread on each of them. Does that affect your idea at all?

Best,

David


Stephane Maldini

unread,
May 18, 2015, 8:46:42 AM5/18/15
to David Dawson, Jon Brisbin, Reactor Framework
If you're upgrading, there were significants changes, but you need to pull reactor-stream (its broken down between core and stream) and you need to sometimes revisit some calls but all features are around and more :)

We'll cut down a 2.0.2 that fixes a bunch of issues reported and detected in the pom as well v soon.

Stephane Maldini

unread,
May 18, 2015, 8:51:26 AM5/18/15
to David Dawson, Jon Brisbin, Reactor Framework
About your timeout issue: timeout(time, unit, Publisher fallback) is your friend it gives you something to do whant that future sucked.

Jon Brisbin

unread,
May 18, 2015, 8:52:51 AM5/18/15
to David Dawson, Stephane Maldini, Reactor Framework
On May 18, 2015, at 7:38 AM, David Dawson <david....@simplicityitself.com> wrote:

Thanks guys!

Still on M1, upgrading now :-)

@stephane. The issue really is that I want a deterministic return time. So if the data required isn't ready, then the return happens with some pre-prepared default version. The future timeout is representing that aspect. The problem with a timeout error is that it kills the stream off, and using a 'join' it means that the stream doesn't return anything at all, so it becomes all or nothing. When semantically I actually want, everything that finishes within the timeout.  I might be going about this the wrong way, but if this could be made to happen then a common task in microservice development can be solved entirely in Reactor, multi-service orchestration.


I think this should happen entirely in Reactor because as I said earlier, it is going to be a really common use case going forward as more and more applications become service call-oriented.



@Jon, the Future instances we're really using aren't FutureTask based, that's just demo code to show what I'm talking about. Instead they are a view onto an event oriented communication over some net connection. So no thread on each of them. Does that affect your idea at all?


There isn’t a Thread attached yet, but traditionally we deal with Futures in an async way by submitting a task off the calling Thread to block on the Future.get() to obtain the result ASAP. That requires 1) creating a FutureTask by submitting a job to a thread pool and 2) burning a Thread.

We shouldn’t be using a shared thread pool, probably, but a specific worker pool created for the express purpose of blocking on Futures. That’s why I’m thinking maybe a RingBufferWorkProcessor with an appropriate number of Consumers that block on Futures could be helpful.

The alternative is to spin over a List<Future<?>> and continually check their “doneness” rather than block the Thread entirely. That won’t give us the result ASAP but As Soon As I Can Get To It.

@David: Are those Futures the return values of task submissions that you control? Or are you handed back a Future to represent an operation that is being completed somewhere else? I ask because we could use a BarrierStream for the former but if we only have access to a Future and can’t control how the task is submitted, then we’d likely be better off using the Processor style.

David Dawson

unread,
May 18, 2015, 9:09:29 AM5/18/15
to Jon Brisbin, Stephane Maldini, Reactor Framework
Yes, I'd love for this to be able to be handled entirely in reactor.

The Future is the interface I'm using right now to front an event interchange happening async and out of process. We do have enough info that we could push the response as it comes back in. So really conceptually it's a promise. 

Unfortunately, at the level the Future is being generated, I can't introduce any dependency on Reactor to use the Promise you guys provide, so we chose a Future.

We do have control over that, so we could use a reactive streams Publisher or some such instead of the Future. That does confuse the API a little with the concept of many responses, another reason that we went Future. 

Thinking about it, a conversion to Publisher would make this all really slick?  We might just take the hit on that in that case.

The timeout() looks ideal, just converting and I'll give it a try.

BTW, what happened to HotStream?  We were using Streams.defer() as an easy way to create a Subscriber.

is there anything you might suggest to replace that?

David

Jon Brisbin

unread,
May 18, 2015, 10:25:46 AM5/18/15
to David Dawson, Stephane Maldini, Reactor Framework
Look at Broadcaster to send values.

The RingBuffer*Processor also allows publishing values via onNext (we even support calling concurrently, which is prohibited by the spec but we can deal with that internally).

A Publisher version would really be the best all around IMO. Would love to help figure out how to do this once so we can apply this pattern going forward. The hardest part, really, is figuring out whether to spin over a set of Futures to check their done state or to burn a single Thread per Future while they’re waiting. If solving this problem generically, we’d need to support waitAll and waitForFirst, etc… so we can’t really let previous Futures block checking on later ones, which argues in favor of the spinning “checker” rather than a blocking .get().

Note that having a Publisher doesn’t mean you get more than 1 value inherently. A Publisher<T> implies *at least one* value whereas a Publisher<Void> explicitly means *no values at all*, only onComplete | onError.


Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor

David Dawson

unread,
May 18, 2015, 11:27:03 AM5/18/15
to Jon Brisbin, Stephane Maldini, Reactor Framework
Indeed, it's the semantics of 'up to one value' that I wanted the API to tell it's users. Which is a Promise, but that's not available in the reactive-streams interfaces explicitly.

It's probably more of an API design thing that anything else. Although a Future is a far easier things to build than a Publisher!

David.



Jon Brisbin

unread,
May 18, 2015, 11:33:55 AM5/18/15
to David Dawson, Stephane Maldini, Reactor Framework
On May 18, 2015, at 10:27 AM, David Dawson <david....@simplicityitself.com> wrote:

It's probably more of an API design thing that anything else. Although a Future is a far easier things to build than a Publisher!


Very true! :)

But the payoff is flexibility, plugability, and that the Future is part of the Past. ;)

Stephane Maldini

unread,
May 18, 2015, 11:52:31 AM5/18/15
to Jon Brisbin, David Dawson, Reactor Framework
David,

I've pushed an example of something for 2.0.2, look at the second one here:

There are also alternatives to Future in Streams :
Streams.generate()/Promises.task() , they both build a valid Publisher that might emit up to one data without the complications.

David Dawson

unread,
May 18, 2015, 2:01:33 PM5/18/15
to Jon Brisbin, Stephane Maldini, Reactor Framework
I think in celebration of jon giving possibly the best JDK related pun, we should build a Publisher implementation and avoid this whole problem.

I'm still a little wary of the semantics of a Publisher on this kind of thing. I can feel it being a little confusing to people. has there been any discussion around reactive streams to give the semantics of a Promise at all?  It might just be a marker interface really, but from an API design point of view the difference between 0-1 and 0-many is big enough to warrant it.

David.

Jon Brisbin

unread,
May 19, 2015, 4:36:39 PM5/19/15
to David Dawson, Stephane Maldini, Reactor Framework
Just as a follow-up: there’s now a GitHub issue where we can discuss a Reactive Streams Promise.

Of particular note is the proposal that we could create a Promise-specific specification a la Promises/A+ (but far, far simpler) that would simply spec the expectations of the Promise but still use the RS semantics like Publisher.



Thanks!

Jon Brisbin
Reactor Project Lead

about.me/jonbrisbin | @j_brisbin | @ProjectReactor

Reply all
Reply to author
Forward
0 new messages