How to make multiple http requests async and combine results?

1,971 views
Skip to first unread message

and...@entangledmedia.net

unread,
Dec 14, 2015, 2:53:53 PM12/14/15
to vert.x
Hello,

I'm writing a status function for a cluster of service nodes and I need to be able to send an arbitrary number of GET requests to servers in the node and combine the results. Vertx has the "vertx.createHttpClient()" method and I understand how to use this to make a single async call, but I can't figure out a good way to make an arbitrary number of calls (between 1 -5, generally speaking) and combine the results. I imagine there's some way using observers or promises but I can't figure it out. Any suggestions would be greatly appreciated!

Thanks,

Andrew

Julien Viet

unread,
Dec 14, 2015, 3:07:30 PM12/14/15
to ve...@googlegroups.com, and...@entangledmedia.net
Hi,

you can do it manually with something like storing them in a data structure associated that reacts on responses, for instance if you need to combine 2 requests, you could do something like:

List<Status> list = new ArrayList<Status>();
Runnable check = () -> {
  if (list.size() == count) {
    // Got all resp -> do something
  }
};

client.getNow(port, host, uri, resp -> {
   Status status = computeStatus(resp);
   list.add(status);
   check();   
});

// Do other requests, etc…

this is a very naive pseudo code, does not show everything of course, just the principle.

Otherwise you could opt for the RxJava API and use observable to achieve something similar using the Observable constructs, most likely using the merging operator: https://github.com/ReactiveX/RxJava/wiki/Combining-Observables



-- 
Julien Viet
www.julienviet.com

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/562e771b-788a-4d05-9025-9f49f4a583a5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Andrew Hughes

unread,
Dec 14, 2015, 3:09:37 PM12/14/15
to Julien Viet, ve...@googlegroups.com
Thanks for the response. I spent a lot of time trying to figure out how to make it work with RxJava and failed. If somebody could post a more detailed response on how that would work, I’d appreciate it. 

Alexander Lehmann

unread,
Dec 14, 2015, 5:32:55 PM12/14/15
to vert.x
I think it should be possible to combine a list of Futures and implement a Future that completes when all the listed Futures are complete or the other way around to have handler that is called when a list of handlers has been called.

Julien Viet

unread,
Dec 14, 2015, 5:39:04 PM12/14/15
to ve...@googlegroups.com, Alexander Lehmann
do you mean io.vertx.core.Future Alex ?

-- 
Julien Viet
www.julienviet.com

--

You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Alexander Lehmann

unread,
Dec 14, 2015, 5:45:39 PM12/14/15
to vert.x, alex...@gmail.com
Yes, I may be mixing up the usecase for Future though.

I assume it should be possible to have something like

Future combined = combinedFuture(List<Future> futures);
combined.setHandler(xxx);

and the have the handler called when all Futures are completed.

Julien Viet

unread,
Dec 14, 2015, 5:48:30 PM12/14/15
to ve...@googlegroups.com, Alexander Lehmann
I do have in mind something like this in io.vertx.core.Future :

  static <T> Future<List<T>> and(Future<T> f1, Future<T> f2) {
    Future<List<T>> ret = Future.future();
    Handler<AsyncResult<T>> handler = ar -> {
      if (f1.isComplete() && f2.isComplete()) {
        if (f1.succeeded() && f2.succeeded()) {
          ret.complete(Arrays.asList(f1.result(), f2.result()));
        } else {
          if (f1.failed()) {
            ret.fail(f1.cause());
          } else {
            ret.fail(f2.cause());
          }
        }
      }
    };
    f1.setHandler(handler);
    f2.setHandler(handler);
    return ret;
  }
  
the issue is that Future<List<T>> is not supported as return type by codegen.

so it could be either @GenIgnore or we could see what it means to allow Future<List<T>> in the code generator.
  
-- 
Julien Viet
www.julienviet.com

Arnaud Estève

unread,
Dec 15, 2015, 3:39:56 AM12/15/15
to vert.x
Alex, is this: https://github.com/aesteve/nubes/blob/master/src/main/java/com/github/aesteve/vertx/nubes/utils/async/MultipleFutures.java what you have in mind ?

I agree that's a common pattern, I'm using it a lot, in several Vert.x projects.

jklingsporn

unread,
Dec 15, 2015, 5:47:12 AM12/15/15
to vert.x
I use CompletableFuture for this kind of tasks.
public void sendSomething(){
        CompletableFuture<Void> future1 = new CompletableFuture<>();
        CompletableFuture<Void> futureN = new CompletableFuture<>();
        CompletableFuture.allOf(future1, futureN).exceptionally(f->{logger.error(f.getMessage(),f);return null;}).thenAccept(
                f->{
                    //everything completed
                }
        );
        sendAsync(h->{
            if(h.succeeded()){
                future1.complete(h.result());
            }else{
                future1.completeExceptionally(h.cause());
            }
        });
        sendAsync(h->{
            if(h.succeeded()){
                futureN.complete(h.result());
            }else{
                futureN.completeExceptionally(h.cause());
            }
        });
    }

Alexi Polenur

unread,
Dec 16, 2015, 9:52:37 AM12/16/15
to vert.x
Hi,

If you want to use Ovservables then you can either user merge or zip functions to combine multiple "completion" observables into one.  Here is an example where I am deploying multiple verticles and want to act when all deployments were finished.

List<Observable<String>> allObs = new LinkedList<Observable<String>>();


allObs.add(

vertx.deployVerticleObservable(impl, new DeploymentOptions(opts))

.map(r->{return logName;}));


allObs.addAll(

deployKafkaBridgeVerticlesIfNeeded(spec));


Observable<String> all = Observable.merge(allObs);


Now you can subscribe to "all" observable and you put your code in "on complete" function..


all.subscribe(

vertName -> {

System.out.println( vertName +  " started ok");

},

err->{

                      System.out.println("Failed to start :" + err);

                      err.printStackTrace();

},

() -> {

 System.out.println("All configured verticles are started ok.");

}



HTH, Alexi

Reply all
Reply to author
Forward
0 new messages