Bug RabbitMQ Java-Client: 530, NOT_ALLOWED - attempt to reuse consumer tag (on auto-recovery)

4,783 views
Skip to first unread message

Frederic Leitenberger

unread,
Feb 20, 2015, 10:36:53 AM2/20/15
to rabbitm...@googlegroups.com
Hello,

Java Client version 3.4.4.

I am trying to use auto-recovery with one registered consumer.

But when i restart my RabbitMQ server everything reconnects, except the consumer fails with this error-message:

java.io.IOException

 at com
.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
 at com
.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
 at com
.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1088)
 at com
.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:367)
 at
MessageReceiverThread.run(MessageReceiverThread.java:123)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'myConsumer', class-id=60, method-id=20)
 at com
.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
 at com
.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
 at com
.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
 at com
.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1086)
 
... 2 more


This error message reappears in the interval that i specified for the networkRecoveryInterval (5sec).


My Code:
ConnectionFactory factory = new ConnectionFactory();
factory
.setHost(config.queueServerHost);
factory
.setPort(config.queueServerPort);
factory
.setAutomaticRecoveryEnabled(true);
factory
.setTopologyRecoveryEnabled(true);
factory
.setNetworkRecoveryInterval(5000);
factory
.setUsername(config.queueServerUser);
factory
.setPassword(config.queueServerPasswd);
factory
.setVirtualHost(config.queueServerVirtualHost);
connection
= factory.newConnection();


Channel channel = connection.createChannel();
channel
.addReturnListener(new MyReturnListener());

// ... declare queues & exchanges ...

QueueingConsumer consumer = new QueueingConsumer(channel);
channel
.basicConsume("myQueue", true, "myConsumer", true, true, null, consumer);

boolean running = true;
while (running)
{
 
while (running)
 
{
   
try
   
{
     
Delivery delivery = consumer.nextDelivery();
     
// ... process delivery
   
}
   
catch (ShutdownSignalException e)
   
{
      e
.printStackTrace();
     
break;
   
}
   
catch (IOException e)
   
{
      e
.printStackTrace();
     
break;
   
}
 
}
 
System.out.println("Trying to reconnect ...");
 
Thread.sleep(1000);
}



I start my program and send keep sending stuff every second (using a second channel).

So everything is running. My receiver is ready to receive (and does work if somethign is coming).

Now my test is: I restart the RabbitMQ server.

After a short moment the sender part resumes sending without any problem.

But the consumer part (code above) does not recover and keeps failing with the above exception.

I went deeper into the debugger and found the underlying exception:

com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'myConsumer', class-id=60, method-id=20)
 at com
.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717)
 at com
.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707)
 at com
.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:662)
 at com
.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:617)
 at com
.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:107)
 at com
.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
 at com
.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
 at com
.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:542)
 at java
.lang.Thread.run(Thread.java:745)


Greetings Fred;

Michael Klishin

unread,
Feb 20, 2015, 12:24:08 PM2/20/15
to Frederic Leitenberger, rabbitm...@googlegroups.com
What is MessageReceiverThread? It seems to try to re-add a consumer that automatic recovery already added. This is not a Java client class as far as I know.

MK
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Frederic Leitenberger

unread,
Feb 23, 2015, 4:26:00 AM2/23/15
to rabbitm...@googlegroups.com, fredl...@yahoo.de
It is my class that does the work. Essentially it contains my posted code.

Michael Klishin

unread,
Feb 23, 2015, 4:49:13 AM2/23/15
to Frederic Leitenberger, rabbitm...@googlegroups.com
On 23 February 2015 at 12:26:02, 'Frederic Leitenberger' via rabbitmq-users (rabbitm...@googlegroups.com) wrote:
> It is my class that does the work. Essentially it contains my
> posted code.

I see a few possible reasons for that getting NOT_ALLOWED when adding a consumer back:

 * What is described on https://groups.google.com/d/msg/rabbitmq-users/p-CDo06KYwc/IZgG22KYf7gJ
 * Your code also attempts to recover
 * You use a client-provided consumer tag

The latter should work unless your consumer is added as exclusive. Java client recovery test suite
only has cases with server-generated consumer tags but other clients certainly have the client-provided
tag covered.

So, consider

 * Not using exclusive consumers
 * Letting RabbitMQ generate consumer tag (it will be returned by Channel#basicConsume)
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Frederic Leitenberger

unread,
Feb 23, 2015, 6:18:42 AM2/23/15
to rabbitm...@googlegroups.com, fredl...@yahoo.de
Hello again Michael,

thanks for the reply. The server-generated consumer tag was a good hint. I did not know it existed. I will try it.

About the exclusive consumers. I am using exclusive queues and consumers to make sure that the same server (configuration) never runs twice simultaneously.
We have one queue for each server to receive messages. And there should never be two consumers or connections be bound to the same queue.
If that happens it would be that half the messages go to one server-instance while the other half goes to the other. This would upset our system dramatically.

Greetings Fred;

Michael Klishin

unread,
Feb 23, 2015, 9:18:01 AM2/23/15
to Frederic Leitenberger, rabbitm...@googlegroups.com
 On 23 February 2015 at 14:18:43, 'Frederic Leitenberger' via rabbitmq-users (rabbitm...@googlegroups.com) wrote:
> About the exclusive consumers. I am using exclusive queues
> and consumers to make sure that the same server (configuration)
> never runs twice simultaneously.
> We have one queue for each server to receive messages. And there
> should never be two consumers or connections be bound to the same
> queue.
> If that happens it would be that half the messages go to one server-instance
> while the other half goes to the other. This would upset our system
> dramatically.

Fred,

This is reasonable. I think that having an exclusive queue should be sufficient,
so try that. Note that exclusive queues are typically server-named. With server-named queues,
automatic connection recovery makes sure that a new name is generated by the server,
this should solve the problem you are observing even without adjusting heartbeats (which I'd still
recommend, by the way).

Let me know if something isn't clear.

Cheers.
Reply all
Reply to author
Forward
0 new messages