New User Question on Calling non-blocking Java APIs from Vert.x?

292 views
Skip to first unread message

Clayton Wohl

unread,
Jan 9, 2018, 1:13:37 AM1/9/18
to vert.x
What is the proper way to write a simple Vert.x http service that calls another non-blocking API that returns a Java Future?

Here is my first attempt that is clearly improper.

I'm using the RxJava2 variant `io.vertx.reactivex.core.Vertx` if that matters:

public static void main(String[] args) throws Exception {
    logger
.info("starting");

   
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

   
System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
   
Vertx vertx = Vertx.vertx();

   
HttpServer server = vertx.createHttpServer();

    server
.requestHandler(request -> {
        logger
.info("requestHandler start");

       
// Here I am simulating an operation waits on something and returns a java Future.
       
// I'm setting an artificial time to complete of five seconds.
       
ScheduledFuture<String> future = scheduler.schedule(() -> {
            logger
.info("Scheduled Event");
           
return "some return value";
       
}, 5, TimeUnit.SECONDS);

       
Single<String> single = Single.fromFuture(future);

        logger
.info("launched single");

        single
.subscribe((value) -> {
            logger
.info("single subscribe");

           
HttpServerResponse response = request.response();
            response
.putHeader("content-type", "text/plain");
            response
.end("Hello World!");

            logger
.info("sent response");
       
});

        logger
.info("exiting requestHandler");
   
});

    server
.listen(8080);
    logger
.info("server listening on 8080");
}


The above code produces the following log output. The number on the beginning of each line is milliseconds since application startup. The `single.subscribe` call clearly acts as a blocking call which ruins the whole point of non-blocking APIs. How do I fix the above?

1098    [main] INFO  verthello.Main - server listening on 8080
4224    [vert.x-eventloop-thread-1] INFO  verthello.Main - requestHandler start
4274    [vert.x-eventloop-thread-1] INFO  verthello.Main - launched single
6390    [vertx-blocked-thread-checker] WARN  i.v.core.impl.BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2171 ms, time limit is 2000
7391    [vertx-blocked-thread-checker] WARN  i.v.core.impl.BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 3172 ms, time limit is 2000
8393    [vertx-blocked-thread-checker] WARN  i.v.core.impl.BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 4175 ms, time limit is 2000
9226    [pool-1-thread-1] INFO  verthello.Main - Scheduled Event
9226    [vert.x-eventloop-thread-1] INFO  verthello.Main - single subscribe
9234    [vert.x-eventloop-thread-1] INFO  verthello.Main - sent response
9234    [vert.x-eventloop-thread-1] INFO  verthello.Main - exiting requestHandler


Clement Escoffier

unread,
Jan 9, 2018, 2:05:48 AM1/9/18
to ve...@googlegroups.com
Hi,

Be careful with Single.fromFuture, it’s blocking. Basically, on subscription it does a future.get() which is blocking. (ref: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Single.html#fromFuture-java.util.concurrent.Future-). There is a variant of fromfuture taking a scheduler as parameter. In this case one of the thread of this scheduler is doing the _waiting_ (so is blocked). 

Clement


--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/a6fe23bf-bd39-4ef4-8ace-030fbaae9023%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Clayton Wohl

unread,
Jan 9, 2018, 11:38:57 AM1/9/18
to vert.x
I already know I'm doing it the wrong way. I pointed this out in the original post and I had found this out the hard way.

I'm looking for how to do this the right way.

I'm reading docs like http://vertx.io/docs/vertx-core/java/ and I don't see how I am supposed to basic things:

- What is the best practices way to work with a Java Future?
- How do I turn a Java Future into a Vert.x Future? What is the purpose/benefit of this?
- How would I have known that Single.fromFuture ultimately calls get() and stealthily converts non-blocking APIs into blocking APIs.

Clement Escoffier

unread,
Jan 9, 2018, 12:01:10 PM1/9/18
to ve...@googlegroups.com

On 9 Jan 2018, at 17:38, Clayton Wohl <clayt...@gmail.com> wrote:

I already know I'm doing it the wrong way. I pointed this out in the original post and I had found this out the hard way.

Sorry, I misread your mail.


I'm looking for how to do this the right way.

I'm reading docs like http://vertx.io/docs/vertx-core/java/ and I don't see how I am supposed to basic things:

- What is the best practices way to work with a Java Future?

Either use a different Scheduler such as the blockingScheduler, or use CompletableFuture with this project https://github.com/cescoffier/vertx-completable-future.

- How do I turn a Java Future into a Vert.x Future? What is the purpose/benefit of this?

Java Future are blocking, Vert.x Future are not. Vert.x Future are notified when the result has been computed / retrieved. In a regular (non Completable) Future, the get() method blocks until you get a result.

- How would I have known that Single.fromFuture ultimately calls get() and stealthily converts non-blocking APIs into blocking APIs.

If you try to call get() in the event loop, Vert.x is going to warn you (as you reported). Except that it’s hard to now. There is a nice tool named reactive-audit that detect blocking code at runtime (https://github.com/octo-online/reactive-audit).

Clement

Grey Seal

unread,
Jan 9, 2018, 12:04:24 PM1/9/18
to vert.x

Clayton Wohl

unread,
Jan 9, 2018, 12:25:39 PM1/9/18
to vert.x
"Java Future are blocking, Vert.x Future are not."

The whole point of any Future is non-blocking operation.

Can you suggest a small code change to the self-contained program that I pasted?

Jez P

unread,
Jan 9, 2018, 12:41:49 PM1/9/18
to vert.x
Jave CompletableFutures enable non-blocking stuff (they are like promises, they enable composition and they also handlers to be attached so that when they complete, the result can be consumed). They can of course also be used in a blocking fashion via get or join.

Java Futures on the other hand do not. This is why they only expose a get method. They do not offer a mechanism for asynchronous consumption. 

The way you could work around it is to use a vert.x setperiodic call to check the isDone status of the future and once it's done then cancel the periodic checking and use the result (which you can then obtain via get). Not sure how you cancel the periodic checking though. Maybe some variant of cancelTimer applies to setPeriodic?

However, it's not really honest for an API to claim to be asynchronous and then expose java Futures on which you have to call get() (which is inherently a blocking call). An asynchronous API should expose the more promise-like CompletableFuture or something which behaves in a similar manner. Otherwise what it means is "Here's an API which you have to consume synchronously via blocking, but which can do some work off-thread).

Thomas SEGISMONT

unread,
Jan 10, 2018, 3:45:24 AM1/10/18
to ve...@googlegroups.com
Can you suggest a small code change to the self-contained program that I pasted?

vertx.<String>executeBlocking(fut ->  {
  try {
    fut.complete(future.get()));
  } catch(Exception e) {
    fut.fail(e);
  }
}, false, ar -> {
  if (ar.succeeded()) {
    // Do the rest of your stuff here
  } else {
    // boom
  }
});

In English: as Clément said, Java's Future is blocking (you can only call get and it will block the caller until the task completes or fails). With Vert.x you must not block the event loop but there are tools to invoke blocking code


You can create an utility which takes a Java Future and returns a Vert.x Future if that simplifies your code.

public static <T> io.vertx.core.Future<T> toVertxFuture(Future<T> future) {
  io.vertx.core.Future<T> result = io.vertx.core.Future.future();
  vertx.<T>executeBlocking(fut ->  {
    try {
      fut.complete(future.get()));
    } catch(Exception e) {
      fut.fail(e);
    }
  }, false, result);
  return result;
}
Reply all
Reply to author
Forward
0 new messages