Efficient way to start and stop vert.x RabbitMq client

589 views
Skip to first unread message

wanderer

unread,
Jun 10, 2018, 12:15:04 PM6/10/18
to vert.x
I am developing an event driven application where RabbitMq is used to publish and Consume message periodically.

Is it a good idea to stop RabbitMQ Client for Vert.x every-time after publishing a message? In my case every after 5 minutes I am producing 900 messages to be push in a queue.
Starting and stopping the client every time after pushing a message may effect on performance, So I developed a mechanism which will stop the connection if it is idle for a certain period of time. 

Which works well for pushing a message because of the event driven nature(the message I am generating are incoming event based). Am I doing it right?

However it  does not work same for message consumption(Setup the link between RabbitMQ consumer and event bus address), once the connection is close everything stops and the only option I have restart the consumer verticle to start the process again.
What are my options here for Consume messages from a queue continuously other than keeping the connection alive forever? or this is the only way? I need help.

wanderer

unread,
Jun 13, 2018, 1:56:16 PM6/13/18
to vert.x
Anyone please?

Pavel Drankov

unread,
Jun 14, 2018, 8:06:47 AM6/14/18
to vert.x
Hi,

In order to figure out what is wrong with the "consuming via event bus API" we need more details. if you have a time for preparing a reproducer and opening an issue, that would be great.

Actually, this way of consuming messages going to be deprecated in the 3.6.0 release and replaced with the new streaming-based version. 

And I think that the API can be an option for you, so you can pause/resume the stream and specify different options, for instance, you can specify how many messages can be stored while a stream is paused, or how we should handle backpressure. Can you try the 3.6.0-SNAPSHOT and share your feedback?

Thanks,
Pavel

Pavel Drankov

unread,
Jun 14, 2018, 8:13:37 AM6/14/18
to vert.x
And one more thing. Here is the article on Vert.x blog about the API: https://vertx.io/blog/eclipse-vert-x-rabbitmq-client-gets-a-new-consumer-api/ . You may also found it useful.

wanderer

unread,
Jun 17, 2018, 1:42:08 PM6/17/18
to vert.x
Thanks for the link. I am definitely try it and let you know. What my point was I wanted to do something like this

    private void push(Handler<AsyncResult<JsonObject>> response,
                           
JsonObject message, RabbitMQClient client) {
        client
.start(handler -> {   //Starting the client or place a logic to check if it is already close then only start it
           
if (handler.succeeded()) {
                client
.basicPublish("", "some.route", message, pubResult -> {
                   
if (pubResult.succeeded()) {
                       
JsonObject successMessage = new JsonObject("{'success':'true'}");
                        response
.handle(Future.succeededFuture(successMessage));
                   
} else {
                        pubResult
.cause();
                   
}
                    client
.stop(Future.future());  //Stopping
               
});
           
}
       
});
   
}

Suppose I am iterating over a collection which has 1k records, creating event from each iteration and then calling the push function which is starting and stopping the connection for 1k times, I don't think that would be healthy. So I came up with an idea of creating a  custome RabbitMQClient connection pool. and close it periodically, which is working perfectly But I want to be sure I am doing it right, Is it necessary to close the client manually or the library will take care of that job?

And I think the same situation is applicable for consuming a message.

Pavel Drankov

unread,
Jun 17, 2018, 1:59:56 PM6/17/18
to vert.x
If you'd like to close the client, you always should do it manually. But I did not get what is the point of closing and opening connection every time you'd like to send a message? I think it is better, simpler and more efficient to keep it, even if it stays idle for some time.

wanderer

unread,
Jun 17, 2018, 2:44:27 PM6/17/18
to vert.x
I understand your point. Previously I was writing it in that way (did not close the connection and keep it open), but if I keep the same connection open for longer than 4 or 5 hours there was some weird connectivity exception, (I forgot what exception was that exactly) and due to what application could not establish further connectivity with rabbitmq  and I had to restart the application every time. But periodically closing it ended that problem, I will re-produce the same situation and post the stack trace here. 

Pavel Drankov

unread,
Jun 17, 2018, 4:05:43 PM6/17/18
to vert.x
Yeah, that would be really awesome if you provide the stacktrace
Message has been deleted

wanderer

unread,
Jun 18, 2018, 2:58:52 AM6/18/18
to vert.x
Please see the stack trace below, this started after sending 35847 messages successfully  to queue (I did not close any connection manually.)

21:40:14.499 [vert.x-eventloop-thread-0] DEBUG com.experiment.hpsmconnect.queue.MessagePublisherImpl - Message published !
21:40:22.502 [vert.x-eventloop-thread-0] DEBUG com.experiment.hpsmconnect.queue.MessagePublisherImpl - Message published !
21:40:30.500 [vert.x-eventloop-thread-0] DEBUG com.experiment.hpsmconnect.queue.MessagePublisherImpl - Message published !
21:40:38.500 [vert.x-eventloop-thread-0] DEBUG com.experiment.hpsmconnect.queue.MessagePublisherImpl - Message published !
Jun 02, 2018 9:40:41 PM io.vertx.rabbitmq.impl.RabbitMQClientImpl
INFO: RabbitMQ connection shutdown! The client will attempt to reconnect automatically
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:742)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:732)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:94)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:138)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)
... 1 more
Jun 02, 2018 9:40:41 PM io.vertx.rabbitmq.impl.RabbitMQClientImpl
INFO: RabbitMQ connection shutdown! The client will attempt to reconnect automatically
com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:742)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:732)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:94)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:138)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:541)
... 1 more
Jun 02, 2018 9:40:46 PM io.vertx.core.eventbus.impl.HandlerRegistration
SEVERE: Failed to handleMessage. address: message-publisher-service-queue
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:170)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.forChannel(RabbitMQClientImpl.java:417)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.basicPublish(RabbitMQClientImpl.java:171)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.sendMessageToQueue(MessagePublisherImpl.java:73)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.pushMessageToAChannel(MessagePublisherImpl.java:46)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:131)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 02, 2018 9:40:46 PM io.vertx.core.impl.ContextImpl
SEVERE: Unhandled exception
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:170)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.forChannel(RabbitMQClientImpl.java:417)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.basicPublish(RabbitMQClientImpl.java:171)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.sendMessageToQueue(MessagePublisherImpl.java:73)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.pushMessageToAChannel(MessagePublisherImpl.java:46)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:131)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
21:40:46.519 [vert.x-eventloop-thread-0] ERROR com.experiment.hpsmconnect.verticles.PeriodicTasksVerticle - unable to publish alive status
io.vertx.serviceproxy.ServiceException: connection is already closed due to connection error; cause: java.io.EOFException
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:139)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 02, 2018 9:40:54 PM io.vertx.core.eventbus.impl.HandlerRegistration
SEVERE: Failed to handleMessage. address: message-publisher-service-queue
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:170)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.forChannel(RabbitMQClientImpl.java:417)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.basicPublish(RabbitMQClientImpl.java:171)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.sendMessageToQueue(MessagePublisherImpl.java:73)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.pushMessageToAChannel(MessagePublisherImpl.java:46)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:131)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 02, 2018 9:40:54 PM io.vertx.core.impl.ContextImpl
SEVERE: Unhandled exception
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:170)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.forChannel(RabbitMQClientImpl.java:417)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.basicPublish(RabbitMQClientImpl.java:171)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.sendMessageToQueue(MessagePublisherImpl.java:73)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.pushMessageToAChannel(MessagePublisherImpl.java:46)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:131)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
21:40:54.499 [vert.x-eventloop-thread-0] ERROR com.experiment.hpsmconnect.verticles.PeriodicTasksVerticle - unable to publish alive status
io.vertx.serviceproxy.ServiceException: connection is already closed due to connection error; cause: java.io.EOFException
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:139)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 02, 2018 9:41:02 PM io.vertx.core.eventbus.impl.HandlerRegistration
SEVERE: Failed to handleMessage. address: message-publisher-service-queue
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:170)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:503)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.forChannel(RabbitMQClientImpl.java:417)
at io.vertx.rabbitmq.impl.RabbitMQClientImpl.basicPublish(RabbitMQClientImpl.java:171)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.sendMessageToQueue(MessagePublisherImpl.java:73)
at com.experiment.hpsmconnect.queue.MessagePublisherImpl.pushMessageToAChannel(MessagePublisherImpl.java:46)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:131)
at com.experiment.hpsmconnect.queue.MessagePublisherVertxProxyHandler.handle(MessagePublisherVertxProxyHandler.java:53)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:223)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:200)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$3(EventBusImpl.java:533)
at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Jun 02, 2018 9:41:02 PM io.vertx.core.impl.ContextImpl
SEVERE: Unhandled exception

wanderer

unread,
Jun 18, 2018, 10:49:22 PM6/18/18
to vert.x
is this some kind of bug or I am missing something?

Arjya

unread,
Jun 19, 2018, 10:57:26 PM6/19/18
to vert.x
I have had similar problem. possibly a bug.

Pavel Drankov

unread,
Jun 21, 2018, 5:37:47 PM6/21/18
to vert.x
I think the error came from the java rabbitmq client[1]. The vertx-rabbitmq-client is built on top of it.

In the 3.6.0 release, we have upgraded the version of the internal client from 3.6.5 to 5.2.0 and, maybe, the issue is gone. Can you try to reproduce the problem with 3.6.0-SNAPSHOT?

This also may be connected with some network problems on the hardware level, so you need to be ready to reinitialize the client.

There is also another option, you increase some properties in order to handle such kind of problems, so take a look at RabbitMQOptions#setNetworkRecoveryInterval, ...#setConnectionTimeout, ...#setConnectionRetries. Keep in mind, that by default there are no connection retries.

Reply all
Reply to author
Forward
0 new messages