Make multiple async calls and combine results

709 views
Skip to first unread message

Akash Jain

unread,
Jun 12, 2018, 3:11:21 PM6/12/18
to vert.x
Hi,

I want to write a service which will accept a JSON payload, perform some transformation ( light weight ) to convert them into buckets of objects, and for each of those bucket, make external JSON call to a remote service, and combine each result into a single list.

1. Receive JSON payload
2. Apply Transformation to fragment into buckets
3.  Call external service via JSON for each bucket ( parallel )
4. Combine each response JSON into a single result
5. Return result to original caller [1]

Any failure of single [3] call should not discard the entire operation and it should try to combine other success call results.

I am confused as to what efficient framework should I use here ;
1. Vert.x ( may be worker Verticle instead of standard Verticle)
2. RxJava / RxNetty
3. Vert.X + RxJava
... any other suggestions?

Julien Viet

unread,
Jun 13, 2018, 3:18:28 AM6/13/18
to ve...@googlegroups.com
in term of performance, all options will be comparable

so you should look at what fits best your needs.

personally I would go either for 1. with Vert.x CompositeFuture or with 3. with Rx Singles

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/85831de8-34eb-4f78-88da-3a7c748993ae%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ronald van Raaphorst

unread,
Jun 13, 2018, 7:07:01 AM6/13/18
to vert.x
We've used a lot of possible options apart from Rx:

1. Use nested eventbus.send calls (ugly, and serial calls)
2. Use a list of futures and use the CompositeFuture to wait for all calls to complete (less ugly and you need to combine the results)
3. Use Quasar (Sync verticles) (Creates more readable code but may cause serial execution)
4. Use Kotlin coroutines (Really nice readable code but may cause serial execution)

It depends on what you need on performance (I've heard that Rx may result in up to 10% less performance overhead but don't know if that is true)
If performance is not an issue, readability, testability and maintainability are the key factors that I'd consider.
I suppose your step 3 is making the asynchronous call, so your code to call all buckets could look like:

/**
 * Sends out a 'Bucket-call' for every bucket in the given list.
 * Calls the handler if all calls are completed.
 * The handler will fail if one of the bucket calls fails.
 * If all bucket calls have succeeded, calls the handler with the merged result
 *
 * @param buckets
 * @param handler
 */
private void handleBuckets(List<JsonObject> buckets, Handler<AsyncResult<JsonObject>> handler) {
   
ArrayList futureList = new ArrayList();
   JsonObject result = new JsonObject();
   for (JsonObject bucket : buckets) {
     
Future<JsonObject> future = Future.future();
      futureList.add(future);
      vertx.eventBus().send("Bucket-call", bucket, h -> {
         
if(h.failed()) {
           
future.fail(h.cause());
         } else {
           
JsonObject bucketResult = (JsonObject)h.result().body();
            result.mergeIn(bucketResult)
            future.complete(bucketResult);
         }
     
});
   }
   
CompositeFuture.all(futureList).setHandler(h -> {
     
if(h.failed()) {
         
handler.handle(Future.failedFuture(h.cause()));
      } else {
         
handler.handle(Future.succeededFuture(result));
      }
   
});
}

You could even pass the 'bucket-call' address as a method parameter and that would make this method just a utility method to be reused in your application.

Ronald

Ronald van Raaphorst

unread,
Jun 13, 2018, 7:17:31 AM6/13/18
to vert.x
Check. This returns a failure if one of the bucket-calls fails. But you get the idea. For me, this is readable and maintainable enough so I'd not bother to throw in other libraries except if you're more comfortable with those.

Julien Viet

unread,
Jun 13, 2018, 10:43:11 AM6/13/18
to ve...@googlegroups.com
where have you read about Rx performance cost ?

I'm interested to read about it and its effect on latencies.

-- 
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

RA van Raaphorst

unread,
Jun 13, 2018, 11:32:43 AM6/13/18
to ve...@googlegroups.com
Hi Julien,

In all honesty, I don’t know any more. I think it’s one of the many youtube talks I’ve seen but I can’t remember exactly.
I might even have mixed up libraries, so I regret I have mentioned it or at least that I can't point to a reliable source.

However, I think there’s some truth in:
"RxJava itself does not increase performance of an application. In fact, it is overhead as it adds object allocation and method invocations compared with a procedural, imperative approach. “ https://groups.google.com/d/msg/rxjava/IS5QxvamIkY/LNXV7LHCBgAJ

and when you don’t need absolute maximum performance, readability, testability and maintainability should carry more weight when
deciding what to use.



You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/DEKJuViujcQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+un...@googlegroups.com.

Julien Viet

unread,
Jun 13, 2018, 11:47:30 AM6/13/18
to ve...@googlegroups.com
Hi,

I was not trying to challenge you

I do think it has some overhead (more allocation, more volatile memory barriers, more CPU cycles, more misprediction, less JIT friendly).

so I was interested in people that did measurements to see how this works out in real life :-)

Julien 


Akash Jain

unread,
Jun 13, 2018, 4:07:02 PM6/13/18
to ve...@googlegroups.com
Ronald,

Thanks so much for your reply. This is very good for me to start with.

I've one follow up question, how can I invoke `handleBuckets`

My earlier code is pretty much like this :

WebClientOptions optionsWeb = new WebClientOptions().setLogActivity(true);
        optionsWeb.setConnectTimeout(10);
        optionsWeb.setIdleTimeout(10);

        WebClient client = WebClient.create(vertx, optionsWeb);

        client.postAbs("<URL>")
            .as(BodyCodec.json(MyResponse.class))
            .addQueryParam("test", "true")
            .putHeader("Content-Type", "application/json")
            .putHeader("Accept", "application/json")
            .timeout(10000)
            .sendJsonObject(new JsonObject(Json.encode(myRequest)), MainVerticle::handle);


    private static void handle(AsyncResult<HttpResponse<MyResponse>> ar) {
        if (ar.succeeded()) {
            // Obtain response

            MyResponse response = ar.result().body();

            System.out.println(response);

        } else {
            System.out.println("Something went wrong " + ar.cause().getMessage());
        }
    }



--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.

Akash Jain

unread,
Jun 14, 2018, 9:58:17 PM6/14/18
to ve...@googlegroups.com
any update? I am really stuck moving to Vert.x from Spring MVC.

I want to call multiple network calls parallely...

Julien Viet

unread,
Jun 15, 2018, 3:27:41 AM6/15/18
to ve...@googlegroups.com
Hi,

you can for each bucket create a Future, make a WebClient request that completes the Future and then take this list of Future to create a CompositeFuture which provides the combinator to achieve your result.

HTTP requests will be achieved concurrently and the CompositeFuture will give you the combined results you are looking for.

HTH

Julien

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

Akash Jain

unread,
Jun 15, 2018, 3:31:15 PM6/15/18
to ve...@googlegroups.com
Julien,

Thanks a lot for the pointers.

Upon trying, i am getting UnknownHostException in webclient, but i do not think its a host problem as I can resolve it fine via terminal. https://github.com/vert-x3/issues/issues/383

Future{cause=failed to resolve 'myhost.com'. Exceeded max queries per resolve 4 }

I'm using vertx-web
3.5.0
 

Here is my code :

private void callMe(RoutingContext routingContext) {
        final IncomingObject incomingObject = Json.decodeValue(routingContext.getBodyAsString(),
            IncomingObject.class);

        OutGoingRequest outGoingRequest = new OutGoingRequest();


        WebClientOptions optionsWeb = new WebClientOptions().setLogActivity(true);
        optionsWeb.setConnectTimeout(10);
        optionsWeb.setIdleTimeout(10);
        optionsWeb.setSsl(true);
        optionsWeb.setTrustAll(true);

        webClient = WebClient.create(vertx, optionsWeb);


        List<OutGoingRequest> outGoingRequests = new ArrayList<>();

        List<Future> futures = new ArrayList<>();
        for(Item item : incomingObject.getItems()) {
          
            Future<JsonObject> future = Future.future(fut -> {
                if(fut.failed()) {
                    System.out.println("failed");
                } else {
                    System.out.println("succeeded");

                }
            });

            futures.add(future);
           
            webClient.postAbs("myhost.com")
                .addQueryParam("myName", "true")

                .putHeader("Content-Type", "application/json")
                .putHeader("Accept", "application/json")
                .timeout(10000)
                .sendJsonObject(new JsonObject(Json.encode(item)), handler1 -> {

                    int code = handler1.result().statusCode();
                    if (code == 200) {
                        future.complete(handler1.result().bodyAsJsonObject());
                    }
                });


        }


        CompositeFuture.all(futures).setHandler(h -> {
            if(h.failed()) {
                System.out.println("all failed");
            } else {
                System.out.println("all succeeded");
            }
        });


    }

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Julien Viet

unread,
Jun 15, 2018, 3:51:30 PM6/15/18
to ve...@googlegroups.com
Hi,


it needs to be an URL with a protocol scheme, i.e http://myhost.com

Julien

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

Akash Jain

unread,
Jun 15, 2018, 3:57:11 PM6/15/18
to ve...@googlegroups.com

Julien

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Akash Jain

unread,
Jun 15, 2018, 4:13:58 PM6/15/18
to ve...@googlegroups.com
Just to clarify :

In webclient, it has protocol . e.g https://myhost.com
just did not make to my code comment earlier.

Akash Jain

unread,
Jun 15, 2018, 7:18:16 PM6/15/18
to ve...@googlegroups.com
Why shall I use http when the remote server is https?

Grey Seal

unread,
Jun 15, 2018, 11:02:35 PM6/15/18
to vert.x
Try adding in VM options: -Dvertx.disableDnsResolver=true 

However, I have not added the above options in the code below and it working fine. I tried with the sample hosted REST-API (https) and it worked for me.

public void execute(RoutingContext event) {
List<Future> futures = new ArrayList<Future>();
trigger(HttpMethod.GET, "https://reqres.in/api/users?page=2", null, null).setHandler(handler -> {
if (!handler.succeeded()) {
handler.cause().printStackTrace();
event.response().putHeader("content-type", "application/json").end(new JsonObject().put("Execute failed: ", handler.cause().getMessage()).toString());
} else {
final JsonObject response = handler.result();
final JsonArray users = response.getJsonArray("data");
users.forEach(r -> {
final JsonObject user = ((JsonObject) r);
MultiMap headers = MultiMap.caseInsensitiveMultiMap();
headers.add("Content-Type", "application/json");
futures.add(trigger(HttpMethod.GET, "https://reqres.in/api/unknown/" + user.getLong("id"), null, headers));
});

CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
System.out.println("Success");
event.response().putHeader("content-type", "application/json").end(new JsonObject().put("Execute complete success", "Ok").toString());
} else {
ar.cause().printStackTrace();
event.response().putHeader("content-type", "application/json").end(new JsonObject().put("Execute complete failed: ", ar.cause().getMessage()).toString());
}
});
}
});
}

public Future<JsonObject> trigger(final HttpMethod method, final String url, final JsonObject payload, final MultiMap headers) {

Future<JsonObject> future = Future.future();
    HttpRequest<Buffer> request = webClient.requestAbs(method, url);
Buffer buffer = Buffer.buffer();
if (null != payload) {
buffer = Buffer.buffer(payload.toString());
request.headers().add(HttpHeaders.CONTENT_LENGTH.toString(), buffer.length() + "");
}
if (null != headers)
request.headers().addAll(headers);

request.send(ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
System.out.println("Received response with status code: " + response.statusCode());
future.complete(response.bodyAsJsonObject());
} else {
ar.cause().printStackTrace();
future.fail(ar.cause());
}
});
return future;
}
Julien

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

-- 
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

Manish Kumar

unread,
Jun 17, 2018, 12:10:32 AM6/17/18
to vert.x
Akash,

Sorry if I misunderstood your problem, but below is something you can try.


// Step 1
// Step 2

// Step 3
WebClientOptions options = new WebClientOptions()
.setDefaultHost("http://myhost.com")
.setDefaultPort(8080);
WebClient webClient = WebClient.create(vertx, options);

return Single.create(caller -> {
List<Observable<Object>> remoteCalls = new ArrayList<>();
for (Json json : ...) {
Observable remoteCall = webClient.request(HttpMethod.POST, "/my/api/path/to/process/json")
.timeout(5000)
.rxSendBuffer(Buffer.buffer(json.string()))
.toObservable();
remoteCalls.add(remoteCall);
}
List<Object> remoteResponse = new ArrayList<>();
Observable.concat(remoteCalls)
.subscribe(jsonResponse -> {
// Step 4
remoteResponse.add(jsonResponse);
}, ex -> {
caller.onError(ex);
}, () -> {
// Step 5
caller.onSuccess(remoteResponse);
});
});

Julien Viet

unread,
Jun 17, 2018, 3:34:02 AM6/17/18
to ve...@googlegroups.com
you should use https

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

Akash Jain

unread,
Jun 19, 2018, 1:01:09 PM6/19/18
to ve...@googlegroups.com
Manish you outlined it pretty well! Thanks.

However, when I am trying to get 'remoteResponse' it is empty list, but remoteResponse.add(jsonResponse) is executed.

All I did is after your method ends, i am doing single.subscribe()
return remoteResponse

Any ideas?

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
Reply all
Reply to author
Forward
0 new messages