Hello.
I've been testing the client autorecovery support in 3.3.0 and have had some issues getting it to work.
On my machine, recovery appears to work at fewer than about 5 consumers. Anything over that, and the client gets a stream of TopologyRecoveryExceptions and is never able to reconnect.
I've looked at the client source and haven't seen any obvious culprit. Has anyone else seen this?
package rabbittest
import com.rabbitmq.client._
object RecoveryTest {
// the consumer count matters. At ~5 consumers the client gets put into a bad state and never reconnects
// Fewer than 5, the client is usually able to reconnect
val consumers = 10
def main(args: Array[String]) {
val factory = new ConnectionFactory()
factory.setUri("amqp://guest@localhost:5672/test")
factory.setAutomaticRecoveryEnabled(true)
val conn = factory.newConnection()
for (x <- 1 to consumers) {
val chan = conn.createChannel()
val result = chan.queueDeclare()
val tag = chan.basicConsume(result.getQueue, new DefaultConsumer(chan))
println(s"channel=${chan.getChannelNumber} queue=${result.getQueue} consumer=$tag")
}
}
}
channel=1 queue=amq.gen-TIierQJcDNd93QzX8gYuww consumer=amq.ctag-grxPT0_DsPA7WccDmbNBrg
channel=2 queue=amq.gen-JaXYQy218NUlWGQCubjjhg consumer=amq.ctag-Toh8E8Ogvmb3UNkKcJszxg
channel=3 queue=amq.gen-VzRS5YVgjP7Lz3UfhxYBgA consumer=amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
channel=4 queue=amq.gen-JNJAT8cGvVPRePX43hSPtg consumer=amq.ctag-0nyXfLeLl9lsApz3XsT0Iw
channel=5 queue=amq.gen-Biake-GSvzyuCFe1b-4FVA consumer=amq.ctag-xtZbH08UoJnJKhi52vn_1g
channel=6 queue=amq.gen-apsJKNLSgN3Dv0VBqPLqSQ consumer=amq.ctag-OwrpVcvqzqB3aQ42PdQY2g
channel=7 queue=amq.gen-KxXs5Hkd1W2I1WPxIydpMQ consumer=amq.ctag-KPZzHfmW2f7Cni5u4gQCvA
channel=8 queue=amq.gen-ZeLhlueGGsAXW_GhuOVS5w consumer=amq.ctag-54Raw5XNJp2W1-uQUcP2aw
channel=9 queue=amq.gen-UZW1yR4QG2Sz90RM9CulQA consumer=amq.ctag-kDDpMuN3vaP-MLeH4Fk7oA
channel=10 queue=amq.gen-Ejgc9jZrg6q1UH3qN2VXuA consumer=amq.ctag-3OxICESMijMUMgZOURXNmw
Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:995)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)
at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:993)
... 9 more
Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw
com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.rabbitmq.client.AlreadyClosedException: connectionconnection error; reason: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)
at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)
... 6 more