Eventbus replies not reaching sender..

321 views
Skip to first unread message

Ranganath Samudrala

unread,
Apr 13, 2020, 3:22:17 PM4/13/20
to vert.x
Hello
I am bummed by this problem a for few days and not sure where I am making the mistake. I send a request, the receiver replies, but the sender never receives the reply... 

14:46:31.355 [vert.x-eventloop-thread-0] DEBUG c.b.t.a.c.a.Adapter - Submitting request: {

  "query" : "SELECT table_name FROM system_schema.tables where keyspace_name = ?",

  "update" : false,

  "returnColDefs" : false,

  "bindVariables" : {

    "bindVariables" : [ {

      "value" : "system_schema",

      "data-type" : "TEXT"

    } ]

  },

  "options" : {

    "storeBlobInFile" : false,

    "blobDownloadDirectory" : "/var/folders/b7/t6wfbskn7n1ggq47rd638q55cz8vjs/T/",

    "blobFilePrefix" : "BLOB-FILE",

    "returnColDefs" : false

  }

} TO CONSUMER: PERSISTENCE; WITH OPTIONS: {

  "timeout" : 300000000,

  "headers" : {

    "jobId" : "aecce6fc3980498b:1912992f:17174dd4878:-80001-1586803591288"

  },

  "localOnly" : true

}

14:46:31.369 [vert.x-eventloop-thread-4] DEBUG c.b.t.p.c.api.ClientAPI - Received query JSON object: {"query":"SELECT table_name FROM system_schema.tables where keyspace_name = ?","update":false,"returnColDefs":false,"bindVariables":{"bindVariables":[{"value":"system_schema","data-type":"TEXT"}]},"options":{"storeBlobInFile":false,"blobDownloadDirectory":"/var/folders/b7/t6wfbskn7n1ggq47rd638q55cz8vjs/T/","blobFilePrefix":"BLOB-FILE","returnColDefs":false}}: from: PERSISTENCE:__vertx.reply.1:jobId: aecce6fc3980498b:1912992f:17174dd4878:-80001-1586803591288

.

.

14:46:33.245 [vert.x-eventloop-thread-4] DEBUG c.b.t.p.c.api.ClientAPI - Sending?: [true]; result to PERSISTENCE:__vertx.reply.1

14:47:03.257 [vert.x-eventloop-thread-4] ERROR c.b.t.p.c.api.ClientAPI - Reply failed: Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.2, repliedAddress: __vertx.reply.1

io.vertx.core.eventbus.ReplyException: Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.2, repliedAddress: __vertx.reply.1

at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$new$0(HandlerRegistration.java:78) [vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:923) ~[vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:887) ~[vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369) ~[vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43) ~[vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:232) ~[vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:224) ~[vertx-core-3.8.5.jar:3.8.5]

at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:913) ~[vertx-core-3.8.5.jar:3.8.5]

at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:139) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]

at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]

at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]





            // SENDER CODE

      private Future<JsonObject> sendAndReceive(@NonNull final String destination,

            @NonNull final JsonObject json,

            final DeliveryOptions dOpts) {

         Promise<JsonObject> promise = Promise.promise();


         LOGGER.debug(String.format(

                     "Submitting request: %s TO CONSUMER: %s; WITH OPTIONS: %s",

                     json.encodePrettily(), destination, dOpts.toJson().encodePrettily()));

 

         this.adapter.getVertx().eventBus().request(destination, json, dOpts, h -> {

            if (h2.succeeded()) {

               Message<Object> reply = h.result();

               if (reply.body() instanceof JsonObject) {

                  JsonObject replyJson = (JsonObject) reply.body();

                  promise.tryComplete(replyJson);

               } else {

                   .. error handling ...

                  promise.tryComplete(..somejson..);

              }

            } else {

                   .. error handling ...

               promise.tryComplete(..somejson..);

            }

         });

         return promise.future();

      }


            //CONSUMER CODE
     final String consumerAddress = this.config().getString(addressConfigKey) == null

                   ? this.config().getString("consumer-address")

                  : Constants.EB_CONSUME_FROM_ADDRESS;


     vertx.eventBus().consumer(consumerAddress, this);

     .

     .

     @Override

     public void handle(@NonNull final Message<JsonObject> msg) {


        .

        .

        vertx.executeBlocking(future -> {

                            .
                            .

        }, result -> {

           if (result.succeeded()) {

                LOGGER.debug(String.format("Sending?: [%s]; result to %s:%s", msg.isSend(), msg.address(),

                            msg.replyAddress()));


                OperationResult or = (OperationResult) result.result();

                msg.replyAndRequest(or.toJson(), dOpts, replyToReplyHandler -> {


                        // THIS WAS ADDED TO SEE IF REPLY WAS ACTUALLY SENT..

                    if (replyToReplyHandler.succeeded()) {


                        LOGGER.debug("Reply succeeded!!!!!!!!!!!!!!!!!!");

                     } else {

                        LOGGER.error("Reply failed: " + replyToReplyHandler.cause().getMessage(),

                                    replyToReplyHandler.cause());

                    }

                });

           } else {

                         .. error handling ...

           }


});

     }

 

thanks in advance..
Ranga

Ranganath Samudrala

unread,
Apr 20, 2020, 11:10:12 AM4/20/20
to vert.x
Ok, I think the problem is not really with the event bus, but it is with the VertxCompletableFuture I am trying to use to return the results of an action back to a legacy piece of code.

         io.vertx.core.Future<Response> future = performActionHandler.call();


         me.escoffier.vertx.completablefuture.VertxCompletableFuture<Response> completablePerformActionResponseme.escoffier.vertx.completablefuture.VertxCompletableFuture.from(this.getVertx(), future);


         // wait synchronously for the response.

         Response response0 = completablePerformActionResponse.get();


         return response0;


In the above code, the "get()" method never receives the completion of the Future. So, the code hangs.


If I have a logic like below without VertxCompletableFuture, it works perfectly fine, but how to return "response0" ?


         future.setHandler(h0 -> { 

            Response response0 = null;

            if (h0.succeeded()) {

               response0 = h0.result();

            } else {

               response0 = createResponse(startTime, null, "failure", h0.cause().getMessage());

            } 

         });

Ranganath Samudrala

unread,
Apr 20, 2020, 11:11:30 AM4/20/20
to vert.x

Jez P

unread,
Apr 21, 2020, 5:26:34 AM4/21/20
to vert.x
Are you calling get() on an event loop thread? If so, by blocking the event loop, you are blocking message processing on that thread so you won't ever complete. 

Thomas SEGISMONT

unread,
Apr 21, 2020, 6:06:51 AM4/21/20
to vert.x
Which version of Vert.x do you use?

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/9dffe4c7-221c-44ef-b4e1-7734b1e28e44%40googlegroups.com.

Ranganath Samudrala

unread,
Apr 28, 2020, 2:28:59 PM4/28/20
to vert.x
If you see the log, the submission to event bus is in a worker thread:

13:07:55.533 [vert.x-worker-thread-1] DEBUG c.b.t.a.c.a.Adapter - Submitting request: { "query" : "SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name = 'system_schema'"}
13:07:55.533 [vert.x-worker-thread-1] DEBUG c.b.t.a.c.a.Adapter - Sending request : { "query" : "SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name = 'system_schema'"}
13:07:55.539 [vert.x-worker-thread-1] DEBUG c.b.t.a.c.a.Adapter - OutboundInterceptor:: Sending outbound event: [send=true][address=PERSISTENCE][reply-address=__vertx.reply.1]
13:07:55.541 [vert.x-eventloop-thread-2] DEBUG c.b.t.a.c.a.Adapter - InboundInterceptor:: Received inbound event: [send=true][address=PERSISTENCE][reply-address=__vertx.reply.1]
13:07:55.542 [vert.x-eventloop-thread-2] DEBUG c.b.t.p.c.api.ClientAPI - Received query JSON object: {"query":"SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name = 'system_schema'"}: from: PERSISTENCE:__vertx.reply.1:jobId: PERSISTENCE

13:07:55.542 [vert.x-eventloop-thread-2] DEBUG c.b.t.p.c.api.ClientAPI - Executing query: Query [query=SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name = 'system_schema']
13:07:55.549 [vert.x-worker-thread-2] DEBUG c.b.t.p.c.api.ClientAPI - Invoking query operation... 
13:07:55.550 [vert.x-worker-thread-2] DEBUG c.b.t.p.c.api.operation.Operation - Invoking query: SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name = 'system_schema' with bind value length[0]
13:08:00.770 [vert.x-eventloop-thread-2] DEBUG c.b.t.p.c.api.ClientAPI - Result: {
  "rows" : [ {
    "keyspace_name" : "system_schema"
  } ],
  "success" : true,
  "exception" : null
}
13:08:00.770 [vert.x-eventloop-thread-2] DEBUG c.b.t.p.c.api.ClientAPI - Sending?: [true]; result to PERSISTENCE:__vertx.reply.1
13:08:00.770 [vert.x-eventloop-thread-2] DEBUG c.b.t.a.c.a.Adapter - OutboundInterceptor:: Sending outbound event: [send=true][address=__vertx.reply.1][reply-address=__vertx.reply.2]

Ranganath Samudrala

unread,
Apr 28, 2020, 2:31:17 PM4/28/20
to vert.x
I am using v3.8.5

thanks
To unsubscribe from this group and stop receiving emails from it, send an email to ve...@googlegroups.com.

Ranganath Samudrala

unread,
Apr 28, 2020, 3:14:12 PM4/28/20
to vert.x
I guess what I am trying to do is:

Invoke Vertx based logic from legacy code. Essentially, the logic looks as below:

Adapter: 
   1. Send message via Event Bus and wait for response 
Client:
   2.  Verticle Receives the event, performs actions, replies with response
Adapter
   3. Receive and process response. 

So, the Adapter is invoked in a JVM thread.

Is this something feasible? Isn't this one of the uses from VertxCompletableFuture ?

thanks

Ranganath Samudrala

unread,
Apr 28, 2020, 4:54:58 PM4/28/20
to vert.x
Elaborating on the logs above:

==== Interceptor (outbound and inbound) shows the address being sent to is PERSISTENCE and the reply to address is __vertx.reply.1 =======
13:07:55.539 [vert.x-worker-thread-1] DEBUG c.b.t.a.c.a.Adapter - OutboundInterceptor:: Sending outbound event: [send=true][address=PERSISTENCE][reply-address=__vertx.reply.1]
13:07:55.541 [vert.x-eventloop-thread-2] DEBUG c.b.t.a.c.a.Adapter - InboundInterceptor:: Received inbound event: [send=true][address=PERSISTENCE][reply-address=__vertx.reply.1]
.
.
.
===== Client verticle then replies to __vertx.reply.1 from address __vertx.reply.2 ============
13:08:00.770 [vert.x-eventloop-thread-2] DEBUG c.b.t.p.c.api.ClientAPI - Sending?: [true]; result to PERSISTENCE:__vertx.reply.1
13:08:00.770 [vert.x-eventloop-thread-2] DEBUG c.b.t.a.c.a.Adapter - OutboundInterceptor:: Sending outbound event: [send=true][address=__vertx.reply.1][reply-address=__vertx.reply.2]
======== Note here, inbound interceptor is never invoked.. the response is being replied to the correct address, but the handler never receives the reply=========
======== Is there any bearing with the fact that reply is from "eventloop-thread" =============

manjun...@gmail.com

unread,
Feb 16, 2021, 5:17:19 PM2/16/21
to vert.x
Hi, 

I have the same issue, response does not reach the sender and the entire application hangs with cpu usage more 100%. 

Where you able to find solution to this problem?

Thanks,
Manjunath

Reply all
Reply to author
Forward
0 new messages