Blocking code: Waiting for async code to complete

7,007 views
Skip to first unread message

Ronald van Raaphorst

unread,
Dec 13, 2016, 8:36:45 AM12/13/16
to vert.x
Hi,

I'm trying to figure how how I can write blocking code that uses non-blocking code. I just can't figure out what the pattern should look like.
(I've tried various options like http://vertx.io/docs/vertx-core/java/#blocking_code)

The outline is:

public Object findTemplateSource(final String fullName) throws IOException {
  ...

  Future future = Future.future();
 
 
// Basically, perform some async database queries
  jdbcClient.getConnection(h -> {
     
...
     future
.complete(someResult);
  });    
 
  try {
   
// Wait for async code to complete
   future
.wait();
   return allReadyFuture.result();
  } catch (Exception e) {
   
logger.error(e.getMessage());
  }
 
return null;
 }


Basically, I need to load a Freemarker template from the database. This code is inside a derived Template loader. The Freemarker API is not asynchronous - a value must be returned. 
But the value is searched for in the getConnection lambda.
The code above is executed in a worker-thread. The allReadyFuture.wait(); causes an IllegalMonitorException. Appearantly the thread can not be 'owned' by this code.

Using a vertx.executeBlocking(...) is more or less the same pattern, but I need to wait for the async results before returning anything or returning execution to the caller.

Does anyone have a good recipe for this kind of issue? Am I overlooking something?

TIA
Ronald


Julien Viet

unread,
Dec 13, 2016, 11:05:48 AM12/13/16
to ve...@googlegroups.com
what I do in such situation is that I use CompletableFuture<T> like:

CompletableFuture<Foo> fut = new CompletableFuture<>();
doAsynsOp(ar -> {
  if (ar.succeeded()) {
    fut.complete(ar.result());
  } else {
    fut.completeExceptionally(ar.cause());
  }
});
Foo foo = fut.get(10, TimeUnit.SECONDS);

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/b75d532e-05f4-4f77-90b0-cfd094c7c38b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ronald van Raaphorst

unread,
Dec 13, 2016, 4:14:49 PM12/13/16
to vert.x
Hi Julien,

Thanks. Tried to do exactly that earlier on. The timer lambda in the example below is just never executed.
Running the code in a unit test in IntellIJ just freezes the debugger and after 30 seconds a normal timeout exceptions is thrown.
I've tried to run the MyVerticle as a worker verticle, but that doesn't matter. Elsewhere I'm using Sync / Fibers a lot, this test just doesn't. This test is not started using the quasar library either.
Using vertx 3.3.3. Can't stand it that I don't understand what's wrong: Everything else with Vertx works absolutely fabulous so far once you get the hang of it.

public class MyVerticle extends AbstractVerticle {

 @Override
 public void start() {
  vertx.eventBus().consumer("test", this::testTimer);
 }

 
public void testTimer(Message<JsonObject> msg) {
 
CompletableFuture future = new CompletableFuture();
  vertx.setTimer(1000, t -> {
   
future.complete(true);
  });
  try {
   future
.get();
   msg.reply(new JsonObject());
  } catch (Exception e) {
   
e.printStackTrace();
   msg.fail(1, e.getMessage());
  }
 
}
}

And the test (class) is:

@RunWith(VertxUnitRunner.class)
public class WtfTest  {

 
@Test
 public void testTest(TestContext context) throws Exception {
 
Vertx vertx = Vertx.vertx();
  Async async = context.async();

  vertx.deployVerticle("MyVerticle", d -> {

   
vertx.eventBus().send("test", new JsonObject(), h -> {
   
async.complete();
   });

  });
 }

}


javadevmtl

unread,
Dec 13, 2016, 5:20:10 PM12/13/16
to vert.x

javadevmtl

unread,
Dec 13, 2016, 5:22:25 PM12/13/16
to vert.x
There's a countdown latch.

javadevmtl

unread,
Dec 13, 2016, 5:36:53 PM12/13/16
to vert.x

Ronald van Raaphorst

unread,
Dec 13, 2016, 5:49:33 PM12/13/16
to vert.x
Thanks for sharing, interesting indeed. But I'd like to know whether I'm doing something wrong or if I've encountered a bug or something else before putting something else in it's place.
As Julien pointed out, the code just should work.

Alexander Lehmann

unread,
Dec 13, 2016, 6:43:41 PM12/13/16
to vert.x
I think using CompletableFuture inside the Verticle code is not correct since it blocks the event loop.

To reply the message after some time has passed, the reply call should be in the lambda for the timer (everything is async in this case).
However if you are doing blocking execution there, it should be in a executeBlocking block, if you have both you can either seperate non-blocking and blocking parts in two and e.g. do the non-blocking stuff first (if it doesn't depend on one another) or do async operations one after another and use an executeBlocking call each time you need a blocking part of your code.

Jez P

unread,
Dec 14, 2016, 2:21:27 AM12/14/16
to vert.x
Alexander is spot on - CompletableFuture::get() blocks the verticle's event loop thread, meaning the timer will be prevented from triggering, since it would be processed on this thread. Therefore the future will never complete. 

Jez P

unread,
Dec 14, 2016, 2:29:27 AM12/14/16
to vert.x
If, as per your original code, you called CompletableFuture::get() in a worker thread, this should work.

I had to do similar in providing a convenient blocking wrapper to shared data for cas logout support in vertx-pac4j (pac4j is blocking so I had to convert the vert.x behaviours for finding a session to a blocking api) and the CompletableFuture route does work. I used rx to handle the async side of things, but you can see the code here 


The important thing is that this is called by pac4j code (which itself is blocking) so in the vertx-pac4j implementation is always ultimately wrapped in executeBlocking to take the CompletableFuture::get off the event loop thread.

Julien Viet

unread,
Dec 14, 2016, 7:57:11 AM12/14/16
to ve...@googlegroups.com
yes my solution was intended only for non Vert.x threads, I thought it was obvious.

-- 
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Julien Viet

unread,
Dec 14, 2016, 7:58:06 AM12/14/16
to ve...@googlegroups.com
not all the time.

if you are sending a clustered message on the event bus, the message is effectively sent when the blocking method is terminated.



-- 
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Ronald van Raaphorst

unread,
Dec 14, 2016, 8:02:07 AM12/14/16
to vert.x
If I run this code in a worker thread using an executeBlocking method, like:

CompletableFuture future = new CompletableFuture();

WorkerExecutor executor = vertx.createSharedWorkerExecutor("test-executor-pool");
executor.executeBlocking(h -> {   // Worker thread
 
vertx.setTimer(1000, t -> {      // test-executor-pool-thread
 
h.complete(true);
 });
}, res -> {
 
future.complete(res.result());
});

try {
 future
.get();
 msg.reply(new JsonObject());
} catch (Exception e) {
 
e.printStackTrace();
 msg.fail(1, e.getMessage());
}

Then the timer still doesn't fire. Appearantly the timer always executes on the worker thread, which is blocked.

Julien Viet

unread,
Dec 14, 2016, 8:13:13 AM12/14/16
to ve...@googlegroups.com
yes that is expected

executeBlocking is meant for using blocking APIs that usually do IO when you don’t have choice to not block.

it is not meant for waiting an async response, specially when it comes from Vert.x (in the example below, just be reactive on the timer). 

it is not meant for consuming a thread (like a thread that would periodically poll something)

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Ronald van Raaphorst

unread,
Dec 14, 2016, 8:20:43 AM12/14/16
to vert.x
Ok, at least this works. I guess I could also use some kind of thread pool as not to create a new thread every time.

CompletableFuture future = new CompletableFuture();

new Thread(() ->                       // Runs on worker tread
 
vertx.setTimer(2000, h -> {          // Runs on newly created thread
   
future.complete(new JsonObject());  // Runs on eventloop thread
  })
).start();

try {
 
Object o = future.get();              // Blocks worker thread
 msg.reply(o);

Ronald van Raaphorst

unread,
Dec 14, 2016, 8:27:57 AM12/14/16
to vert.x
What I'd like to understand is that I imagined in the code below, that the vertx.setTimer is run a the test-executor-pool-thread but is blocked by the blocked worker thread.
Does vertx internally switch to the worker thread (which is blocked) before setting the timer (or when the timer lambda is executed)?

Alexander Lehmann

unread,
Dec 16, 2016, 7:36:45 PM12/16/16
to vert.x
I think the timer is executed in a event loop thread that is associated with the context (or a new context is created when you are on a non-vertx thread).

If you code fragment runs in an eventloop thread that is already in vertx, it will block since you are mixing blocking and async operations.
If the timer is set in a worker thread, I am not sure, its probably the same worker thread if the context is the same as well.

Clement Escoffier

unread,
Dec 19, 2016, 3:16:29 AM12/19/16
to ve...@googlegroups.com
Hi,

When you do future.get(), be sure to NOT be in the event loop because this call is blocking. Vert.x promotes another way of doing this that you can easily achieve with:

future.handle((res, err) -> {
    // Called when the future has been completed
});

Clement

Clement Escoffier

unread,
Dec 19, 2016, 3:17:17 AM12/19/16
to ve...@googlegroups.com
Forgot to say, completable future would use the current thread to execute the handle method, so be sure you are executing it in the event loop.

Clement

RA van Raaphorst

unread,
Dec 19, 2016, 3:31:06 AM12/19/16
to ve...@googlegroups.com
Hi Clement,

Thanks, I got that part. But I just needed to block the (worker) thread, because I overwrote a sync method in the free marker library:
The method must return something 

Ronald

You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/ob4VrXTrqlU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+un...@googlegroups.com.

Clement Escoffier

unread,
Dec 19, 2016, 4:04:56 AM12/19/16
to ve...@googlegroups.com
So, there is only one way: use future.get (blocking) in a worker thread.

Clement


Ronald van Raaphorst

unread,
Mar 22, 2017, 10:11:58 AM3/22/17
to vert.x
 Unfortunately, switching to 3.4.1 seemed to have broken the code:
  WorkerExecutor executor = vertx.createSharedWorkerExecutor("sqltemplateloader-pool");
 
...  
 
/**
   * Override Freemarkers findTemplateSource method.
   * This is a synchroneous method. We must return something when the method exits
   * but we need to perform asynchroneous database queries.
   */

 
public Object findTemplateSource(final String fullName) throws IOException {
 
  ...

    // Execute the query on a separate thread because this tread will be blocked by the future.get() below
    CompletableFuture<JsonObject> completableFuture = new CompletableFuture();
    logger.info("1: "+Thread.currentThread().getName());

    executor.executeBlocking(h -> {
       
logger.info("2: "+Thread.currentThread().getName());   <!-- THIS POINT IS NEVER REACHED IN 3.4.1

       // Perform some time async sql query
       h.complete(null);
    },
    result -> {
       
if (result.failed()) {
         completableFuture
.completeExceptionally(result.cause());
       } else {
         completableFuture
.complete((JsonObject) result.result());
       }
   
});

    try {
      logger.info("3: "+Thread.currentThread().getName());
      JsonObject src = completableFuture.get(60, TimeUnit.SECONDS);
      ...  
      return src;
   
} catch (Exception e) {
     
...
      return null;
    }
  }

When I use the code with vertx 3.4.0.Beta, the method completes and the log shows 

1: vert.x-worker-thread-1
4: vert.x-worker-thread-1
2: sqltemplateloader-pool-0

When I use the code with vertx 3.4.0 or 3.4.1, the method does not complete and the log shows 

1: vert.x-worker-thread-1
4: vert.x-worker-thread-1

In this case the code that should run the executeBlocking block is apparently already blocked by the future.get call. 
If there's a solution out there, I'd really like to know :) For now, I see no other options than using a non-vertx, synchroneous connection and perform synchroneous database queries...
Reply all
Reply to author
Forward
0 new messages