java client reconnect error: attempt to reuse consumer tag

122 views
Skip to first unread message

James Bellenger

unread,
Apr 11, 2014, 6:29:13 PM4/11/14
to rabbitmq...@googlegroups.com
Hello.
I've been testing the client autorecovery support in 3.3.0 and have had some issues getting it to work.
On my machine, recovery appears to work at fewer than about 5 consumers. Anything over that, and the client gets a stream of TopologyRecoveryExceptions and is never able to reconnect.

I've looked at the client source and haven't seen any obvious culprit. Has anyone else seen this?

Test Setup:
client jar: amqp-client-3.3.0.jar
server: rabbitmq-server 3.1.3-1 (from ubuntu 13.10)
Triggering autorecovery via 'service rabbitmq-server restart'

Client code (scala):
package rabbittest

import com.rabbitmq.client._

object RecoveryTest {
  // the consumer count matters. At ~5 consumers the client gets put into a bad state and never reconnects
  // Fewer than 5, the client is usually able to reconnect
  val consumers = 10

  def main(args: Array[String]) {
    val factory = new ConnectionFactory()
    factory.setUri("amqp://guest@localhost:5672/test")
    factory.setAutomaticRecoveryEnabled(true)

    val conn = factory.newConnection()

    for (x <- 1 to consumers) {
      val chan = conn.createChannel()
      val result = chan.queueDeclare()
      val tag = chan.basicConsume(result.getQueue, new DefaultConsumer(chan))
      println(s"channel=${chan.getChannelNumber} queue=${result.getQueue} consumer=$tag")
    }
  }
}

Good Logs
channel=1 queue=amq.gen-KvCBOoSjkyhpqle8GMGB0Q consumer=amq.ctag-4WBZRhTLhp_mnQDCXs1NGg
channel=2 queue=amq.gen-bNUrlmqi2nHZiunZ9qy-UA consumer=amq.ctag-2_GR91qniV39a-qTtK9Beg
channel=3 queue=amq.gen-li-v7cR25J7kmQ-YfcqbCQ consumer=amq.ctag-llfj8Zf1yFCnIGxpU0EI8A

Bad Logs (topology exceptions repeat on every reconnect attempt)
channel=1 queue=amq.gen-TIierQJcDNd93QzX8gYuww consumer=amq.ctag-grxPT0_DsPA7WccDmbNBrg
channel=2 queue=amq.gen-JaXYQy218NUlWGQCubjjhg consumer=amq.ctag-Toh8E8Ogvmb3UNkKcJszxg
channel=3 queue=amq.gen-VzRS5YVgjP7Lz3UfhxYBgA consumer=amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
channel=4 queue=amq.gen-JNJAT8cGvVPRePX43hSPtg consumer=amq.ctag-0nyXfLeLl9lsApz3XsT0Iw
channel=5 queue=amq.gen-Biake-GSvzyuCFe1b-4FVA consumer=amq.ctag-xtZbH08UoJnJKhi52vn_1g
channel=6 queue=amq.gen-apsJKNLSgN3Dv0VBqPLqSQ consumer=amq.ctag-OwrpVcvqzqB3aQ42PdQY2g
channel=7 queue=amq.gen-KxXs5Hkd1W2I1WPxIydpMQ consumer=amq.ctag-KPZzHfmW2f7Cni5u4gQCvA
channel=8 queue=amq.gen-ZeLhlueGGsAXW_GhuOVS5w consumer=amq.ctag-54Raw5XNJp2W1-uQUcP2aw
channel=9 queue=amq.gen-UZW1yR4QG2Sz90RM9CulQA consumer=amq.ctag-kDDpMuN3vaP-MLeH4Fk7oA
channel=10 queue=amq.gen-Ejgc9jZrg6q1UH3qN2VXuA consumer=amq.ctag-3OxICESMijMUMgZOURXNmw
Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-LOiutBNDPk8Dl4gUWxJgdg
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:995)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)
at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:993)
... 9 more
Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw
com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-54Raw5XNJp2W1-uQUcP2aw
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:488)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:365)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:47)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:344)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
at java.lang.Thread.run(Thread.java:744)
Caused by: com.rabbitmq.client.AlreadyClosedException: connectionconnection error; reason: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - attempt to reuse consumer tag 'amq.ctag-LOiutBNDPk8Dl4gUWxJgdg', class-id=60, method-id=20)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:312)
at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:45)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:481)
... 6 more
Reply all
Reply to author
Forward
0 new messages