Hello,
Java Client version 3.4.4.
I am trying to use auto-recovery with one registered consumer.
But when i restart my RabbitMQ server everything reconnects, except the consumer fails with this error-message:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1088)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:367)
at MessageReceiverThread.run(MessageReceiverThread.java:123)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'myConsumer', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1086)
... 2 more
This error message reappears in the interval that i specified for the networkRecoveryInterval (5sec).
My Code:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(config.queueServerHost);
factory.setPort(config.queueServerPort);
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setUsername(config.queueServerUser);
factory.setPassword(config.queueServerPasswd);
factory.setVirtualHost(config.queueServerVirtualHost);
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new MyReturnListener());
// ... declare queues & exchanges ...
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("myQueue", true, "myConsumer", true, true, null, consumer);
boolean running = true;
while (running)
{
while (running)
{
try
{
Delivery delivery = consumer.nextDelivery();
// ... process delivery
}
catch (ShutdownSignalException e)
{
e.printStackTrace();
break;
}
catch (IOException e)
{
e.printStackTrace();
break;
}
}
System.out.println("Trying to reconnect ...");
Thread.sleep(1000);
}
I start my program and send keep sending stuff every second (using a second channel).
So everything is running. My receiver is ready to receive (and does work if somethign is coming).
Now my test is: I restart the RabbitMQ server.
After a short moment the sender part resumes sending without any problem.
But the consumer part (code above) does not recover and keeps failing with the above exception.
I went deeper into the debugger and found the underlying exception:
com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'myConsumer', class-id=60, method-id=20)
at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707)
at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:662)
at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:617)
at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:107)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:542)
at java.lang.Thread.run(Thread.java:745)
Greetings Fred;