Invoking async methods synchronously

3,221 views
Skip to first unread message

Brian Lalor

unread,
Sep 17, 2012, 11:01:25 AM9/17/12
to ve...@googlegroups.com
Seems like there are quite a few of these types of questions popping up. Maybe we can settle this once and for all. :-)

I'm using Vert.x in an embedded scenario. I need to do something like invoke EventBus.send(), wait for a reply, and then return the the result of the reply. So:

class Adapter {
String getResult(String payload) {
String someResult = null;

eventBus.send(
"address",
new JsonObject().putString("payload", payload),
new Handler<Message<JsonObject>>() {
public void handle(Message<JsonObject> msg) {
someResult = msg.body.getString("result");
}
}
);

waitForResult();

return someResult;
}
}

Some "legacy" code is going to invoke Adapter.getResult(). How can I implement waitForResult() without blocking the event loop?

T.J. Crowder

unread,
Sep 17, 2012, 11:17:48 AM9/17/12
to ve...@googlegroups.com
Hi,
Caveat: I'm quite new to Vert.x. But not to async programming.

But I believe the answer to your question is: If Adapter#getResult() is called from an event loop thread, you *can't* implement waitForResult without blocking the event loop, which isn't allowed. Instead, your options are:

1. Don't call Adapter#getResult() from an event loop thread. Instead, call it from a worker verticle. Even then, implementing waitForResult is going to be messy.

2. Modify the "legacy" code so that it passes a handler into getResult that gets called with the result, rather than expecting it as a return value. This is routine in async programming, using callbacks rather than return values. Your Adapter becomes (roughly, not tested):

class Adapter {
   
void getResult(String payload, Handler<String> callback) {
       
        eventBus
.send(
           
"address",
           
new JsonObject().putString("payload", payload),
           
new Handler<Message<JsonObject>>() {
               
public void handle(Message<JsonObject> msg) {
                    callback
.handle(msg.body.getString("result"));
               
}
           
}
       
);
   
}
}

...and the legacy code using it changes from:

String result;

doSomethingBefore
();
result
= adapterInstance.getResult("payload data here");
doThis
(result);
doThat
(result);
doTheOther
(result);

to (roughly, not tested):

doSomethingBefore();
adapterInstance
.getResult("payload data here", new Handler<String>() {
   
public void handle(String result) {
        doThis
(result);
        doThat
(result);
        doTheOther
(result);
   
}
});

Apologies if this is sort of a "you can't do that" non-answer. :-)

-- T.J.

Brian Lalor

unread,
Sep 17, 2012, 11:22:02 AM9/17/12
to ve...@googlegroups.com
On Sep 17, 2012, at 11:17 AM, "T.J. Crowder" <t...@crowdersoftware.com> wrote:

But I believe the answer to your question is: If Adapter#getResult() is called from an event loop thread, you *can't* implementwaitForResult without blocking the event loop, which isn't allowed. Instead, your options are:

Adapter#getResult() will actually be called from code that knows nothing of Vert.x and is not an event-based environment.  So I need to synchronously invoke EventBus#send() and return the result to the caller of Adapter#getResult().  The Adapter class is in fact "adapting" the legacy code to the Vert.x environment.  I want to do something similar to, say, Vert.x's FileSystem#openSync(), but BlockingActions are explicitly off-limits to anything outside of the Vert.x internals.

T.J. Crowder

unread,
Sep 17, 2012, 11:33:52 AM9/17/12
to ve...@googlegroups.com
Hi,
How is the legacy code being called? If it's not event-based and knows nothing of Vert.x, then presumably it's not being called on a Vert.x event loop thread....? If it is, that's your problem: I believe you'll want to move it to a worker module instead (my #1), since it (the code) expects to block. "Use a worker verticle" seems to be the Vert.x answer to questions about blocking operations. :-)

-- T.J.

Brian Lalor

unread,
Sep 17, 2012, 11:55:27 AM9/17/12
to ve...@googlegroups.com
I want the legacy code to be able to be able to consume data from the Vert.x world.  Let's say the legacy code is something you can't easily replace with Vert.x, like an EJB or a servlet.  In that case, when the application starts, it spins up a clustered, embedded Vertx instance.  When a request is handled — say via HttpServlet#doGet() — I want to invoke Adapter#getResult() and return said result in my HttpServletResponse.  

I guess the adapter could spawn a new thread, invoke the EventBus#send() and then wait() on an object…

Object waitOnMe = "waiter";
new Thread(new Runnable() {
    void run() {
        vertx.eventBus().send("addr", new JsonObject(…), handler {
            waitOnMe.notify();
        }
    }
}).start();
waitOnMe.wait();

bytor99999

unread,
Sep 17, 2012, 3:17:10 PM9/17/12
to ve...@googlegroups.com
Of course I want to add my 2 cents. Because as you stated in the first post. A good number of us need this functionality of synchronicity without blocking the event loop thread. And it seems to be the only way is to wrap the synchronicity feel inside an async model. I think we can think of it like Messaging, JMS where the request has a "replyTo" so that the reply message can be lined up with the request whenever it arrives, which could be anytime.

So what I was going to do was to have AMQP do the work. Have a worker module send an AMQP message. The reply comes back on AMQP, where I have a separate handler/listener looking for that reply message, then send it to the original sender. In which I keep that information in say a SharedSet or SharedMap to send the response back to the client? Using sockets.

Mark

Joy

unread,
Sep 17, 2012, 4:52:59 PM9/17/12
to ve...@googlegroups.com
To avoid "callbacks hell" like other callback programing styles,
vert.x  need some kind of "non-blocking wait" or deffered calls:
deffered (js)
non-blocking futures (akka)
promises (js).

bytor99999

unread,
Sep 17, 2012, 7:58:38 PM9/17/12
to ve...@googlegroups.com
Wouldn't non-blocking wait or deferred calls still be implemented the same way with callbacks. passing closures as listeners that get called back when the response comes back? All it would be doing is hiding that fact from us? I could be wrong, as I am often.

Mark

Joy

unread,
Sep 17, 2012, 8:43:42 PM9/17/12
to ve...@googlegroups.com
Of course it's not a straight bridge between vert.x & not vert.x worlds, but a way to write more linear and readable code.

Joy

unread,
Sep 17, 2012, 11:00:58 PM9/17/12
to ve...@googlegroups.com
@blablor maybe it will be more good:
final Queue q=new BlockingQueue();


vertx
.eventBus().send("addr", new JsonObject(…),
handler {
q.put(data);
}
);

result
=q.take();

Brian Lalor

unread,
Sep 17, 2012, 11:12:08 PM9/17/12
to ve...@googlegroups.com
On Sep 17, 2012, at 11:00 PM, Joy <a.pec...@gmail.com> wrote:

@blablor maybe it will be more good:
final Queue q=new BlockingQueue();

vertx.eventBus().send("addr", new JsonObject(…), 
handler {
q.put(data);
}
);

result=q.take();


Much better, Joy.  I was just trying to whack something together from memory, but that's much more elegant.  Still not "vertxian", but probably workable. :-)

Pid *

unread,
Sep 18, 2012, 3:00:35 AM9/18/12
to ve...@googlegroups.com
That just blocks on q.take().


p



--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To post to this group, send an email to ve...@googlegroups.com.
To unsubscribe from this group, send email to vertx+un...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/vertx?hl=en-GB.

Tim Fox

unread,
Sep 18, 2012, 4:14:58 AM9/18/12
to ve...@googlegroups.com
Ideally I would push the async back through your entire application
(it's async all the way down).

However if your 'legacy' code really needs to get a result
synchronously, and if you're running embedded then you're not on an
event loop (you're on one of your own threads) when you call
eventBus.send, so blocking on waitForResult() is not an issue - you're
not blocking any event loop here.

waitForResult could just be a countdownlatch of object.wait or one of
the myriad other ways Java lets you transfer values between threads.

Tim Fox

unread,
Sep 18, 2012, 4:15:59 AM9/18/12
to ve...@googlegroups.com
On 18/09/2012 08:00, Pid * wrote:


On 18 Sep 2012, at 04:12, Brian Lalor <bla...@bravo5.org> wrote:

On Sep 17, 2012, at 11:00 PM, Joy <a.pec...@gmail.com> wrote:

@blablor maybe it will be more good:
final�Queue�q=new�BlockingQueue();

vertx.eventBus().send("addr",�new�JsonObject(�),�
handler�{
q.put(data);
}
);

result=q.take();


Much better, Joy. �I was just trying to whack something together from memory, but that's much more elegant. �Still not "vertxian", but probably workable. :-)

That just blocks on q.take().

That should be fine since the Brian is not on an event loop when he calls eventBus.send (he's running embedded).

Joy

unread,
Sep 19, 2012, 1:41:04 AM9/19/12
to ve...@googlegroups.com
I think it's better to hide suspend mechanism behind future implementation, in verticle we can write:

   // Parallel execution
Future<String> fs=call_async_service("service1",data);
Future<String> fs2=call_async_service("service2",data);
 
// serialise next service call on futures
Future<Integer> res=call_async_service("service3",fs.get(),fs2.get());

do something with res.get()

possible future implementation :

class FutureImpl<T> implements Future<T>{
   
...

   
@Override
   
public T get() throws InterruptedException, ExecutionException {
     
return (T) suspend();
   
}

   
...
}

Suspend may be coroutine suspend/yeld, for example  https://code.google.com/p/coroutines/
or JavaFlow

Joy

unread,
Sep 19, 2012, 1:47:24 AM9/19/12
to ve...@googlegroups.com
Future<String> fs=call_async_service("service1",data);
Future<String> fs2=call_async_service("service2",data);

not good example :(, better this:


Future<String> fs=eventbus.send("service1",data);
Future<String> fs2=eventbus.send("service2",data);
   // serialise next service call on futures
Future<Integer> res=eventbus.send("service3",fs.get(),fs2.get());

Tim Fox

unread,
Sep 19, 2012, 4:00:54 AM9/19/12
to ve...@googlegroups.com
+1

I've thought about this quite a lot in the past. (I believe there is already a task for implementing a direct style API over the async API using coroutines. https://github.com/vert-x/vert.x/wiki/Task-list )

I was looking at Kilim previously to implement this, but unfortunately that project seems dead/inactive now :(

JavaFlow is another possibility.

If you're interested in picking up this task, you'd be welcome to it :)
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To view this discussion on the web, visit https://groups.google.com/d/msg/vertx/-/YkYVbMuVqkQJ.
Message has been deleted

Joy

unread,
Oct 26, 2012, 8:07:38 PM10/26/12
to ve...@googlegroups.com
Hi Tim, I found a greate java implementation of  Promises here.
I made some concepts & examples of using promises in verticles.
Extended Eventbus sample for support promises in send & pub call.

EventBusPromisified.java
PromisesConceptVerticle.java
PromisesUtils.java

Buddha Shrestha

unread,
Dec 16, 2015, 9:38:12 AM12/16/15
to vert.x
Hi everyone, I'm new at vertx, and I have a problem where I have two async methods and both of them are a sql query (one is select and the other is update). The later is dependent on the former, but while executing in vertx, both run occur concurrent (obviously :) ) and it seems the later's result is irrespective of the former's. 

I applied the method applied by Joy, using Blocking queue, but it throws exception. Also how would you push the async back through your entire application, as mentioned by Tim Fox?

Clement Escoffier

unread,
Dec 16, 2015, 9:43:50 AM12/16/15
to ve...@googlegroups.com
Hello,

So in your case you just need to execute your second query in the callback of the first. You can see an example here: http://vertx.io/blog/using-the-asynchronous-sql-client/:


connection.execute(
  "CREATE TABLE IF NOT EXISTS Whisky (id INTEGER IDENTITY, name varchar(100), origin varchar(100))",
  ar -> {
    if (ar.failed()) {
      fut.fail(ar.cause());
      connection.close();
      return;
    }
    connection.query("SELECT * FROM Whisky", select -> {
     // ....
    });
  }
);

Clement


--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/6ee7678e-1ca4-4c74-9c19-ae8c76886ef2%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Buddha Shrestha

unread,
Dec 16, 2015, 10:19:48 AM12/16/15
to vert.x
Hi Clement, thanks for the reply.

Sorry I couldn't properly articulate my problem previously. I have been using the queries in the call backs as you mentioned. But my problem is:
1.In the first method I select * from a table and in the second query, if there does not exists I put the INSERT INTO statement else I put UPDATE statement.
2.In the second method also, I do the same -- which means there should exists only 1 record in my table (if there exists, I just update the record).
But, as it turns out, both the methods run asynchronously and if there exists no records while starting, the methods end up both executing the INSERT INTO statement -- which actually should have been - first the INSERT INTO followed by Update.

Could you please suggest what should I be doing ?

Thanks,

Farzad Pezeshkpour

unread,
Dec 17, 2015, 4:39:48 AM12/17/15
to vert.x
FWIW
Sme of our use cases need both clean handling of async (w/o the pyramid of callbacks) and more advanced composition.

e.g in rather loose pseudo code ...

do(a)
.thenInParallel(b, c)
.then(d)

Where a, b, c, d are all async operations, with the results of one stage being passed to the next.

The point here being that the execution flow is a graph and not classically linear.

This type of composition is more common than one would at first imagine. It even forms the basis of some libraries (e.g. Finagle).

To implement it over vert.x's constructs (Handler, AsyncResult) some libraries that perhaps could help (in addition to those mentioned on the thread):

RXJava (but then everything is a stream..)
JDefered
JDK's CompletableFuture

regards
Fuzz

Julien Viet

unread,
Dec 17, 2015, 5:45:34 AM12/17/15
to ve...@googlegroups.com, Farzad Pezeshkpour
I believe in Vert.x we should provide something based around the existing Future object and allow basic composition.

It would not provide as much as things as RxJava, etc… but it could provide the basic operations that are very useful like join and chain and it would be polyglot, available in all languages.

-- 
Julien Viet
www.julienviet.com

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Farzad Pezeshkpour

unread,
Dec 17, 2015, 6:17:53 AM12/17/15
to vert.x, farzad.pe...@gmail.com
That would be neat.

Buddha Shrestha

unread,
Dec 17, 2015, 6:21:17 AM12/17/15
to vert.x, farzad.pe...@gmail.com
Hi Julien,

So as of now, are there no other alternative using only vertx? Should I be planning on implementing my application on RXjava?

Julien Viet

unread,
Dec 17, 2015, 6:29:11 AM12/17/15
to ve...@googlegroups.com, Buddha Shrestha, farzad.pe...@gmail.com
there is RxJava support, check on the web site.

Ian Andrews

unread,
Dec 19, 2015, 9:35:43 AM12/19/15
to vert.x, farzad.pe...@gmail.com
You certainly could implement this pattern with just vertx, but you will have to do more work.  This snippet demonstrates how to execute your example flow:


Vertx vertx = Vertx.vertx();

Handler<Long> third = l -> {
System.out.println("Last operation");
};

AtomicInteger counter = new AtomicInteger(0);
Handler<Long> second = l -> {
System.out.println("In parallel operations");
if (counter.incrementAndGet() == 2) {
vertx.setTimer(1, third);
System.out.println("Parallel operations complete");
}
};

Handler<Long> first = l -> {
vertx.setTimer(1, second);
vertx.setTimer(1, second);
System.out.println("First operation complete");
};

vertx.setTimer(1, first);


If this sort of implementation doesn't bother you than you don't need RxJava.  With that said, the whole point of RxJava was to support that type of composable workflow.

Asher Tarnopolski

unread,
Dec 20, 2015, 3:43:48 AM12/20/15
to vert.x, farzad.pe...@gmail.com
did anyone try to use https://github.com/cyngn/vertx-util for this matter?

Farzad Pezeshkpour

unread,
Dec 20, 2015, 3:51:34 AM12/20/15
to vert.x
That looks thanks!

Farzad Pezeshkpour

unread,
Dec 21, 2015, 2:56:13 PM12/21/15
to vert.x
I was looking for something that would also work nicely with the EventBus ...

Sketched up this API / implementation to show the intention ...

https://github.com/dazraf/vertx-async-compose

The test:
https://github.com/dazraf/vertx-async-compose/blob/master/src/test/java/io/dazraf/vertx/async/PromiseTest.java

Julien Viet

unread,
Jan 14, 2016, 3:36:11 AM1/14/16
to ve...@googlegroups.com, Farzad Pezeshkpour
Hi,

I started to contribute improvements on the existing Future object to allow basic composition (in branch called composite-future).

It does not aim to be an RxJava / CompletableFuture / Promise like but its goal is to solve coordination of async that people often have (for example waiting for 2 async results, etc...) , for instance:

Future<HttpServer> f1 = Future.future();
Future<NetServer> f2 = Future.future();
vertx.createHttpServer()....listen(f1.handler());
vertx.createNetServer()....listen(f2.handler());
CompositeFuture.and(f1, f2).setHandler(ar -> {
  if (ar.succeeded()) {
    // both have started
  } else {
    // at least one failed
  }
});

The feature is not a big engineering effort so it should be in 3.2.1

Reply all
Reply to author
Forward
0 new messages