Chaining Many Async Operation

798 views
Skip to first unread message

Klaus Schaefers

unread,
Jul 7, 2016, 9:00:05 AM7/7/16
to vert.x
Hi,

I have to perform many async tasks in one of my vertx web handler and I only want to return when all tasks are done. E.g. I have to copy some files, and save some objects into different mongo tables. The naive way of calling the next task one the previous one has finished leads to a callback hell and is also a kind of serial. Moreover the numnber of tasks is dynamic, which made my use lists, which resulted in horrible code:


pirvate void saveToMongo(List<JsonObject> list, Handler<Long> handler)
if(list.size() > 0){
JsonObject o = list.pop();
mongo.save(..., o, mongoHandler->{
 
saveToMongo(list, handler)
});
)
} else {
handler.handler(0); 
}
}

Any Ideas how to improve this?

Best,

Klaus 

Patrick Braga-Henebry

unread,
Jul 8, 2016, 10:43:14 AM7/8/16
to vert.x
From my understanding, the recommendation is to use something like RxJava to handle composition of multiple async events, though I have not used it myself.

Guillaume Chauvet

unread,
Jul 9, 2016, 3:37:32 AM7/9/16
to vert.x
Take a look on my vertx-async library at https://github.com/gchauvet/vertx-async (first released will be published at end of July)
Your code can be written like this (not tested) :

private void saveToMongo(List<JsonObject> list, Handler<AsyncResult<Void>> handler) {
        AsyncCollections.each(list, (item, callback) -> {
            // Do something
            mongo.save(..., item, mongoHandler -> {
                if(mongoHandler.succeed()) {
                    callback.handle(DefaultAsyncResult.succeed());
                } else {
                    callback.handle(DefaultAsyncResult.fail(mongoHandler));
                }
            });
        }, e -> {
            handler.handle(e);
        });
    }

Tiago Reis

unread,
Jul 22, 2016, 10:58:37 AM7/22/16
to vert.x
One way of implementing it with RxJava


private Observable<Integer> saveToMongo(List<JsonObject> list) {
        return Observable.from(list)
                .map(entry -> mongo.saveObservable("...", entry))
                .count();
}

This returns an observable with the number of items that you saved, you would then subscribe to that observable and continue with what you were doing. 

This is doing concurrent saves on the database, which I'm not sure if it's what you are trying to achieve or not. If you want to do one at each time I guess you would need to use a BlockingObservable

Bogdan Mart

unread,
Jul 22, 2016, 12:50:54 PM7/22/16
to vert.x
Hello, I've used  java.util.concurrent.CompletableFuture for this
see https://github.com/vert-x3/vertx-rx/issues/39#issuecomment-234590544

example:

     CompletableFuture<Void> chain = getAbs(httpClient, URL_WELL_KNOWN_OPENIDCONFIGURATION)
                .thenCompose(resp -> wrapToFuture(resp::bodyHandler, resp::exceptionHandler))
                .thenApply(Buffer::toString)
                .thenCompose(deserializeHelper(OpenIdMetadata.class))
                .thenCompose(meta -> getAbs(httpClient, meta.getJwksUri()))
                .thenCompose(resp -> wrapToFuture(resp::bodyHandler, resp::exceptionHandler))
                .thenApply(Buffer::toString)
                .thenCompose(deserializeHelper(SigningKeys.class))
                .thenCompose((keys) ->{ .... })


четверг, 7 июля 2016 г., 16:00:05 UTC+3 пользователь Klaus Schaefers написал:

Sivid Wong

unread,
Jul 28, 2016, 10:56:37 PM7/28/16
to vert.x
I've noticed that CompletableFuture sometimes assigns my HttpClient instance to run on another thread, which leads to a Vertx warning.  It's "fixable" by wrapping a HttpClient call with a vertx.runOnContext(), but I *guess* the more idiomatic way would be to avoid using CompletableFuture, as it's a different thread handling implementation to the one in Vertx.

CompletableFuture, according to Java documents, uses JVM's default fork/join pool.  Although one interesting thing I've observed is that HttpClient calls chained with CompletableFuture tends to run on Vertx's own threads as well (with thread names like vert.x-eventloop-thread-X).  This is confusing, to me at least, maybe someone in the know could chime in?

To stay on topic, the default io.vertx.core.Future already has support for a .all operation, see http://vertx.io/docs/vertx-core/java/#_async_coordination.
If that's not enough for your use case, I've been using this implementation https://github.com/jtruelove/vertx-util with no problems.

Bogdan Mart於 2016年7月23日星期六 UTC+8上午12時50分54秒寫道:

Bogdan Mart

unread,
Jul 31, 2016, 7:15:09 AM7/31/16
to vert.x
If CF has bug with thread, that's sad. AFAIK it calls callbacks in same thread in which complete was called.

I've looked at  io.vertx.core.Future and it's pretty useles:
future.setHandler(ar -> { if (ar.succeeded()) { // All servers started } else { // At least one server failed } })
As you need ti check result status every time, but I'd like tochain all methods, and put single catch at the end.
vertx-util on other hand seems wery promissing! It's reall promise API, will look through it, hope it's better than jdeferred. Thanks for clue!

d
пятница, 29 июля 2016 г., 5:56:37 UTC+3 пользователь Sivid Wong написал:

Bogdan Mart

unread,
Jul 31, 2016, 8:13:52 AM7/31/16
to vert.x

Things to Remember

  • you must call eval() after creating your chain
That's sad. Same thing as in ReqctJava where execution starts only after someone subscribed. But I guess that's acceptable. Still gonna give it a try.

пятница, 29 июля 2016 г., 5:56:37 UTC+3 пользователь Sivid Wong написал:
I've noticed that CompletableFuture sometimes assigns my HttpClient instance to run on another thread, which leads to a Vertx warning.  It's "fixable" by wrapping a HttpClient call with a vertx.runOnContext(), but I *guess* the more idiomatic way would be to avoid using CompletableFuture, as it's a different thread handling implementation to the one in Vertx.
Reply all
Reply to author
Forward
0 new messages