Problem with topology auto recovery and exclusive queue (Bug?)

2,135 views
Skip to first unread message

Frederic Leitenberger

unread,
Feb 20, 2015, 12:00:02 PM2/20/15
to rabbitm...@googlegroups.com
Hello,

RabbitMQ Java Client version 3.4.4.

I am trying to use connection & topology auto-recovery with an exclusive queue.

My test-case is:
- I separate the physical network connection to my RabbitMq server and reconnect it after a moment (minute or so).
- Then i see what my software does and if it recovers. But it does not.

I think the problem/bug is, that the recovery code was not designed to work with exclusive queues.
Maybe it fails when the client creates a new connection (after hard disconnect) but the server still holds the old connection.


When disconnecting the cable (physically) for about 1 minute nothing happens visiblyand then i get this:

Caught an exception during connection recovery!
java
.net.NoRouteToHostException: No route to host: connect
 at java
.net.DualStackPlainSocketImpl.connect0(Native Method)
 at java
.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
 at java
.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
 at java
.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
 at java
.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
 at java
.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
 at java
.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 at java
.net.Socket.connect(Socket.java:579)
 at com
.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
 at com
.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:34)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:434)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:406)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:52)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:351)
 at com
.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:574)
 at java
.lang.Thread.run(Thread.java:745)





After reconnecting it after a moment i get these exceptions:

Caught an exception when recovering topology Caught an exception while recovering queue MyQueueName: null
com
.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering queue MyQueueName: null
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:516)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:468)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:411)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:52)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:351)
 at com
.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:574)
 at java
.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
 at com
.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
 at com
.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
 at com
.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
 at com
.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:833)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:259)
 at com
.rabbitmq.client.impl.recovery.RecordedQueue.recover(RecordedQueue.java:38)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverQueues(AutorecoveringConnection.java:494)
 
... 7 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
 at com
.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
 at com
.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
 at com
.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
 at com
.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
 
... 11 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
 at com
.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
 at com
.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
 at com
.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
 
... 1 more
Caught an exception when recovering topology Caught an exception while recovering binding between MyExchangeName and MyQueueName: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
com
.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering binding between MyExchangeName and MyQueueName: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverBindings(AutorecoveringConnection.java:529)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:469)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:411)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:52)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:351)
 at com
.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:574)
 at java
.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
 at com
.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:228)
 at com
.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:214)
 at com
.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
 at com
.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:917)
 at com
.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
 at com
.rabbitmq.client.impl.recovery.RecordedQueueBinding.recover(RecordedQueueBinding.java:14)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverBindings(AutorecoveringConnection.java:525)
 
... 7 more
Caught an exception when recovering topology Caught an exception while recovering binding between MyQueueName and MyQueueName: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
com
.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering binding between MyQueueName and MyQueueName: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverBindings(AutorecoveringConnection.java:529)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverEntities(AutorecoveringConnection.java:469)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:411)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:52)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:351)
 at com
.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:574)
 at java
.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
 at com
.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:228)
 at com
.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:214)
 at com
.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
 at com
.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:917)
 at com
.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)
 at com
.rabbitmq.client.impl.recovery.RecordedQueueBinding.recover(RecordedQueueBinding.java:14)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverBindings(AutorecoveringConnection.java:525)
 
... 7 more
Caught an exception when recovering topology Caught an exception while recovering consumer MyConsumer: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
com
.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer MyConsumer: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:554)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:412)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:52)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:351)
 at com
.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:574)
 at java
.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'MyQueueName' in vhost '/', class-id=50, method-id=10)
 at com
.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
 at com
.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:228)
 at com
.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1075)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:367)
 at com
.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:542)
 
... 6 more


My Code:
ConnectionFactory factory = new ConnectionFactory();
factory
.setHost(config.queueServerHost);
factory
.setPort(config.queueServerPort);
factory
.setAutomaticRecoveryEnabled(true);
factory
.setTopologyRecoveryEnabled(true);
factory
.setNetworkRecoveryInterval(5000);
factory
.setUsername(config.queueServerUser);
factory
.setPassword(config.queueServerPasswd);
factory
.setVirtualHost(config.queueServerVirtualHost);
connection
= factory.newConnection();




Channel channel = connection.createChannel();
channel
.addReturnListener(new MyReturnListener());


// setup my queue
channel
.queueDeclare("MyQueueName", false, true, true, null); // durable=false, exclusive=true, autoDelete=true


// setup my exchange
channel
.exchangeDeclare("MyQueueName", "fanout", false, true, null); // durable=false, autoDelete=true
channel
.queueBind("MyQueueName", "MyQueueName", "");




QueueingConsumer consumer = new QueueingConsumer(channel);
channel
.basicConsume("MyQueueName", true, "MyConsumer", true, true, null, consumer);


boolean running = true;
while (running)
{
 
while (running)
 
{
   
try
   
{
     
Delivery delivery = consumer.nextDelivery();
     
// ... process delivery
   
}
   
catch (ShutdownSignalException e)
   
{
      e
.printStackTrace();
     
break;
   
}
   
catch (IOException e)
   
{
      e
.printStackTrace();
     
break;
   
}
 
}
 
System.out.println("Trying to reconnect ...");
 
Thread.sleep(1000);
}


Greetings Fred;

Michael Klishin

unread,
Feb 20, 2015, 12:31:50 PM2/20/15
to Frederic Leitenberger, rabbitm...@googlegroups.com
It takes a while to detect TCP connection loss: TCP keep-alive timeout is 9 * 75 seconds on Linux by default. Use a reasonable heartbeat timeout if you test by pulling Ethernet cord.

MK
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Frederic Leitenberger

unread,
Feb 23, 2015, 4:24:11 AM2/23/15
to rabbitm...@googlegroups.com, fredl...@yahoo.de
That is not the problem. The problem is that the automatic recovery & reconnect does not work.

Frederic Leitenberger

unread,
Feb 23, 2015, 4:39:45 AM2/23/15
to rabbitm...@googlegroups.com, fredl...@yahoo.de
Hello Michael,

Actually it might be connected to the problem.
The RabbitMQ server is running on a linux system.
The client (in my test scenario) is on a windows machine.

So windows has a shorter TCP-IP timeout than the server, so only the client disconnects, but not the server.
Thus the server still holds the exclusive lock for the queue which the client now tries to get a second time.

I think the auto-recovery should account for that and make it work after reconnect.
What use is a auto-recovery that does not work properly?
Can you (or who ever is in charge of this feature) please take a look at it? Thanks.

Greetings Fred;
...

Michael Klishin

unread,
Feb 23, 2015, 4:44:04 AM2/23/15
to Frederic Leitenberger, rabbitm...@googlegroups.com
On 23 February 2015 at 12:24:13, 'Frederic Leitenberger' via rabbitmq-users (rabbitm...@googlegroups.com) wrote:
> That is not the problem. The problem is that the automatic recovery
> & reconnect does not work.

Frederic,

Yes, this can be the root cause with exclusive queues.

You say you use exclusive queues. The problem with exclusive queues during recovery is that
if another connection is still open, then queue.declare performed by connection recovery will fail.

It takes time to detect TCP connection failure. TCP has a mechanism called keep-alive that ensures
connections are not considered to be dead prematurely. On Linux for a connection to time out
it takes 9 unsuccessful attempts to receive a TCP heart beat from the peer, waiting for 75 seconds each
time.

You do the math. Contrary to the popular belief among some software professionals, pulling out
the Ethernet cord does not make the peer immediately notice this. Welcome to the world of distributed
systems, where the only realistic way of knowing the peer is alive is to receive "notifications" from it
every so often.

I hope it's easy to see why this can trip up automatic recovery: Java client waits for 5 seconds before
attempting to recover. When it does, it attempts to declare the queue while RabbitMQ server
thinks that the old connection is still around.

RabbitMQ protocol has a feature called heartbeats that makes TCP connection failure detection
quicker:
http://www.rabbitmq.com/heartbeats.html

You may be confused by

"Caught an exception during connection recovery! java.net.NoRouteToHostException: No route to host: connect".

Java client should retry recovering after any exception it encounters. It passes the exception to the exception
handler which by default logs it to stderr.

You have two options:

 * Use a reasonably low heartbeat timeout (e.g. 6-10 seconds, lower may produce false positives)
 * Don't use exclusive queues (server-naming queues alone can be sufficient to ensure only one consumer ever uses them). 
--
MK

Staff Software Engineer, Pivotal/RabbitMQ
Reply all
Reply to author
Forward
0 new messages