Create new Vert.X component or use CompletableFuture?

1,889 views
Skip to first unread message

Psycho Punch

unread,
Apr 21, 2017, 8:52:41 AM4/21/17
to vert.x
Hello everyone.

I have a number of service code that I defined previously that are blocking. Things like database-interfacing services, REST client code, etc. I'm planning to use them inside my Vert.X components but I'm not sure whether I should kind of overhaul them with async counterparts, or just put practically wrap them inside an asynchronous CompletionStage/CompletableFuture construct.

For example, I have a BookRepository that persists, updates, retrieves data to/from the database, with a method, say, findBooks. This is a blocking service code, and I'm wondering if I should use an async counterpart (still not sure if there's JDBC support within the Vert.X platform) or just wrap it inside a CompletableFuture like:

    CompletionStage completion = CompletableFuture.supplyAsync(bookRepository::findBooks);

Using this in a MessageConsumer for example, I can add a callback to the resulting CompletionStage with something like:

    completion.thenAccept(() -> vertxEvent.reply(true));

Is this regular practice/occurrence in the Vert.X domain? Are there any implications to the event loop when doing this?

jklingsporn

unread,
Apr 21, 2017, 10:38:16 AM4/21/17
to vert.x

Psycho Punch

unread,
Apr 21, 2017, 12:00:48 PM4/21/17
to vert.x
So, according to the README.md of the project:

Completable Future uses a fork-join thread pool. So callbacks (dependent stages) are called in a thread of this pool, so it's not the caller thread.

Does this mean, the only issue with using CompletableFuture/CompletionStage in Vert.X context is the executor used (by default)? So, if I set my CompletableFutures to run async operations using Vert.X's thread pool, the issues should be mostly gone? Or, alternatively, if I set Vert.X to use the Fork/Join pool used by CompletaleFuture, it'd be fine (at least in terms of the issues resulting from Vert.X and CompletableFuture using different Executors). In short, Vert.X and the CompletableFutures used in its context should be sharing the same Executor. Did I get that right?

Clement Escoffier

unread,
Apr 21, 2017, 12:13:51 PM4/21/17
to ve...@googlegroups.com
Hi,

* If you use the non “async” method, and the completion of the future happen in the vert.x context, you are fine.
* If you use an async method with an executor executing in a Vert.x context, you are fine
* If you use an async method on a regular completable future, you are not fine as you are using a fork/join thread pool
* If you use an async method on the Vert.x completable future of https://github.com/cescoffier/vertx-completable-future, you are fine as you pass the context your want when creating the VertxCompletableFuture.

Clement

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/1467a672-d7c4-4da1-99b4-27b2144fcb96%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Psycho Punch

unread,
Apr 21, 2017, 1:27:20 PM4/21/17
to vert.x
Hi Clement, Thanks!

So, I've been looking at how to get hold of the Executor being used by Vertx, but there doesn't seem to be an easy way to do that. I checked the source for VertxCompletableFuture, and I saw that it uses a custom Executor that uses the Vertx Context. Is that the only way to get it working?

Julien Viet

unread,
Apr 21, 2017, 2:46:58 PM4/21/17
to ve...@googlegroups.com
one thing to keep in mind with the VertxCompletableFuture is that it breaks the contract of CompletableFuture as CF mentions that async operations *must* be on the FJ pool.

so if you use the VCF as a CF and pass it to code that assumes it’s a CF, it might block the event loop assuming it would be on the FJ pool whereas it is not.



Psycho Punch

unread,
Apr 22, 2017, 12:06:42 AM4/22/17
to vert.x
one thing to keep in mind with the VertxCompletableFuture is that it breaks the contract of CompletableFuture as CF mentions that async operations *must* be on the FJ pool.

Not sure if this is necessarily true as there's a factory method in CompletableFuture that allows CF's to run in the context of non-default ForkJoinPool.

To be honest, I still do not understand the implications of having CompletableFuture execute through an Executor different from the one used by Vert.X itself. If I call join() on the CF running on a different pool, then that's going to make the event loop wait, but what I'm planning to do is return a CompletionStage in my API and so client code won't be able to do that unless then explicitly wrap that in another CF. If I understand correctly, once the CF forked from the main thread (event loop in the context of Vert.X) the callback is run in the context of the calling thread, which is the event loop. So if I have the following code:

CompletionStage<List<Books>> findBooks = bookService.findAllAsync();
findBooks
.thenApply(books -> {
   
//process books here as booksJson
    vertxEvent
.reply(booksJson);
});

the thenApply callback is run in the context of the Vert.X's event loop, isn't it? If that's the case, then wouldn't it be good since, regardless of where the process of finding books occur, when the control goes back to the Vert.X event loop, it's only going to spend the time converting the result to JSON and send it back to the Vert.X component that originally started the operation?

Jez P

unread,
Apr 22, 2017, 5:47:10 AM4/22/17
to vert.x
Where you might care about whether or not you're on the vert.x pool/context is as follows:-

If you're accessing verticle state which might change then if you're not executing on the same thread as the verticle you could well end up with weirdness, unless the state is synchronised (I've been dealing with similar with the pac4j async stuff I've been working on lately). If you're doing work which is not in any way correlated with stuff you're changing from a vert.x thread, then you should be ok, though if you want your thenApply (which to me looks like it should be a thenAccept) to do the right thing you might want to use runOnContext around the vertxEvent.reply (not sure if that's needed or not but I was informed it didn't cost much - sending messages can occur from any thread). I'm not sure you're guaranteed for it to run on the verticle's thread if the findAllAsync doesn't - easy way to check though is to dump out the name of the thread inside the thenApply call, just before you reply. 

One other thing, if findBooks can complete exceptionally and you care about that, you might want to use whenComplete or handle instead of either thenApply or thenAccept - these let you deal with both successful and failed completion. 

Julien Viet

unread,
Apr 22, 2017, 9:22:42 AM4/22/17
to ve...@googlegroups.com
I’m not arguing about the actual implementation, the contract of CF in the javadocs seems pretty explicit to me:

"All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task)."

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Psycho Punch

unread,
Apr 22, 2017, 2:27:56 PM4/22/17
to vert.x
Hey Jez, thanks for those pointers, and yeah, I actually meant thenAccept instead of thenApply.

So in general, if I keep asynchronous operations independent of the Verticle's state, (which I think is general rule of thumb) I should be fine, even if those operations run in the context of a different Executor. Say I have a Verticle whose sole purpose is to practically wrap the findBooks function for other Verticles, and I deployed it as a worker Verticle running on its own thread, how does the async call to findBooksAsync affect the operation? Is this something like overkill, and that I could keep everything simple if I just use the blocking findBooks inside a worker Verticle?

How do these approaches compare?
1) Regular Verticle running blocking findBooks
2) Regular Verticle running async findBooks (via CompletionStage)
3) Worker Verticle running blocking findBooks
4) Worker Verticle running async findBooks (via CompletionStage)

Also, it's possible to create multiple instances of either regular or worker Verticles that do the same thing.

Jez P

unread,
Apr 23, 2017, 4:52:59 AM4/23/17
to vert.x
Personally by default I'd just wrap the blocking calls to your service code inside executeBlocking unless you want to expose a more general API. 

As far as your list of 4 goes
1. No no - don't block the event loop
2. Legit but not sure you need to do this - as I said I'd just use executeblocking
3. Comparable to using executeBlocking (from memory there's a very subtle difference) but over the event bus
4. If you've taken the blocking code off the event loop then no need to use a worker verticle.

Others may have different opinions (though I think 1 and 4 have little room for opinion, in that 1 is a bad idea and 4 adds no benefit vs 2).

Tim Fox

unread,
Apr 23, 2017, 5:29:43 AM4/23/17
to vert.x
The simplest thing to do is to use vertx.executeBlocking and then a simple adaptor class to adapt the Handler<AsyncResult> to CompletableFuture.

You don't have to worry about fork-join pools or anything like that, and the CF handlers will be executed on the right Vert.x thread:

Tim Fox

unread,
Apr 23, 2017, 5:42:45 AM4/23/17
to vert.x
Here's an illustration using your findBook example:

Psycho Punch

unread,
Apr 23, 2017, 7:01:14 AM4/23/17
to vert.x
Jez, Tim, thank you for reminding me of executeBlocking; it flew straight over my head. For some reason, my default approach to running a blocking code in the context of Vert.X has been to put it inside a Worker Verticle.

Tim, thanks for your sample code. However, there's one thing I don't quite understand: AsyncResCF. Is that supposed to be a class that implements both CompletionStage and Handler?

Tim Fox

unread,
Apr 23, 2017, 7:35:22 AM4/23/17
to vert.x


On Sunday, 23 April 2017 12:01:14 UTC+1, Psycho Punch wrote:
However, there's one thing I don't quite understand: AsyncResCF. Is that supposed to be a class that implements both CompletionStage and Handler?


Yes, I included the code for that on the first gist above. 

Psycho Punch

unread,
Apr 23, 2017, 11:51:24 AM4/23/17
to vert.x
Oh, yeah. Sorry I missed that.

Psycho Punch

unread,
Apr 24, 2017, 12:19:40 AM4/24/17
to vert.x
If my BookRepository uses the blocking MongoDB client, would it be better if I instead use Vert.X's own MongoDB client API, or is the difference between that and wrapping the blocking call in executeBlocking almost negligible?

Tim Fox

unread,
Apr 24, 2017, 4:05:49 AM4/24/17
to vert.x
I think that would depend on the implementation of the MongoDB client driver.

Jez P

unread,
Apr 24, 2017, 4:54:21 AM4/24/17
to vert.x
Personally I'd suggest first port of call is what you can deliver quickest (which is presumably executeBlocking since you can reuse your existing code).

Then I'd branch and create a version using the vert.x mongo db client api, and run performance tests both under load and under limited load to see how things compare.

Psycho Punch

unread,
Apr 24, 2017, 11:31:33 AM4/24/17
to vert.x
Tim, the repositories I have currently are implemented using Spring Data library. As far as I know, it's using MongoDB's Java client under the hood.

Jez, yup, that's actually what I'm thinking about doing: the path of least resistance. I was just wondering if there are immediate concerns I need to know of.

Jez P

unread,
Apr 24, 2017, 11:35:35 AM4/24/17
to vert.x
I doubt there are major immediate concerns. Do you know the likely throughput? I think you have a limited worker pool on executeBlocking (but you can set it via config). Also you can use ordered false in your executeBlocking call if you're not messing with shared state and order of execution doesn't matter (often it doesn't). 

Tim Fox

unread,
Apr 24, 2017, 2:42:20 PM4/24/17
to vert.x
I've collapsed it into a single method in this gist: https://gist.github.com/purplefox/7f464be396dfb04275ee075468eba823

So basically TLDR; There's no need to create a new type of CompletableFuture class to solve this kind of use case, a standard CompletableFuture does the job fine.


On Sunday, 23 April 2017 12:01:14 UTC+1, Psycho Punch wrote:
Reply all
Reply to author
Forward
0 new messages