AMQP consumer fail with TopologyRecoveryException

521 views
Skip to first unread message

Narendra Sharma

unread,
Jul 2, 2019, 6:00:42 PM7/2/19
to rabbitmq-users
RabbitMQ: 3.7.15
3 node cluster

Consumer connected to one of the node fails with following exception:
Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag--q9AloWxo_frHgn-iHaSog: null
com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag--q9AloWxo_frHgn-iHaSog: null
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:635)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:478)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:70)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:409)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:74)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1118)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:397)
at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:623)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:360)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1116)
... 9 more

Around the same time I see following exception in the broker logs:
2019-07-02 14:56:52.542 [error] <0.15096.754> Error on AMQP connection <0.15096.754> (<AMQPCLIENTIP>:42960 -> <RABBITMQBROKERIP>:5672, vhost: '/', user: 'guest', state: running), channel 1:
 method_field_shortstr_overflow
2019-07-02 14:56:52.542 [warning] <0.15096.754> Non-AMQP exit reason 'method_field_shortstr_overflow'

Any idea what could cause this?

The broker was running fine. Other brokers nodes and consumers connected to queues on those nodes were also running fine.



Arnaud Cogoluègnes

unread,
Jul 3, 2019, 4:40:17 AM7/3/19
to rabbitm...@googlegroups.com
Which version of the Java client are you using?

Could you be more specific about the topology of your application? Did
you manage to reproduce the error?

More server logs could be useful.
> --
> You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
> To post to this group, send email to rabbitm...@googlegroups.com.
> To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/bc95cf40-048e-419b-9464-16eeea461abc%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Narendra Sharma

unread,
Jul 3, 2019, 11:12:53 AM7/3/19
to rabbitmq-users
Java: 1.8
AMQP Java client version: 3.6.5

Topology: 3 node rabbitmq cluster (16 core, 128 GB each)
AMQP Consumers: 3 servers (different from rabbitmq nodes), 6 consumers (2 on each node), 2 out of 6 consumers running into this issue. Other 4 consumers consume from different queue than the 2 having this issue.
MQTT Publishers: 100K+


AMQP consumers (the 2 having this issue) consume from queue bind to amq.topic exchange with wildcard pattern. Following is the current state of the queue:

Details

Features
durable:true
Policy
Operator policy
Effective policy definition
Noderabbit@ip-RABBITMQSERVERIP
Mirrors
State
running
Consumers0
Consumer utilisation 0%
TotalReadyUnackedIn memoryPersistentTransient, Paged Out
Messages 1,949,4591,949,459019,39701,930,062
Message body bytes 1.9GiB1.9GiB0iB19MiB0iB1.9GiB
Process memory 31MiB

I restarted the consumers multiple times. Once this error is hit, nothing works. I created another queue with different name and same binding and that worked, which allowed me to get new messages but old messages are just sitting in the queue on server.

There are no additional logs on the server (I checked all the logs files in /var/log/rabbitmq/) except one exception from publisher every now and then:
2019-07-03 07:45:57.218 [error] <0.4306.474> MQTT cannot parse frame for connection '<MQTTCLIENTIP>:49333 -> <RABBITMQSERVERIP>:1883', unparseable payload: <<175,8,0,32,47,65,77,80,80,65,67,69,49,48,48,48,49,51,57,50,52,56,47,114,111,47,111,112,101,114,97,116,105,111,110,51,49,53,54,50,49,51,57,57,53,55,49,53,53,79,87,81,79,69,70,87,84,79,84,68,85,90,71,66,71,84,68,77,65,82,87,68,77,83,89,83,82,85,84,66,75,81,87,79,69,86,79,67,72,75,87,76,70,66,81,82,70,85,67,81,89,83,65,73,90,67,73,88,73,84,84,85,71,80,79,83,86,76,77,88,73,66,67,81,81,77,78,78,71,89,90,77,83,72,86,85,76,82,71,88,77,86,66,84,65,87,68,80,73,66,87,80,83,66,82,65,69,88,88,75,84,78,65,65,66,71,78,70,68,84,73,82,66,73,82,79,72,83,79,79,69,82,68,87,71,83,65,84,71,77,69,76,71,74,72,89,73,85,77,88,83,77,89,90,67,67,77,73,79,75,66,80,81,73,65,70,71,73,80,80,65,68,77,74,77,87,69,82,68,87,90,70,68,66,77,73,78,89,77,68,80,78,85,82,82,77,77,79,77,75,69,75,67,88,68,73,70,73,65,83,65,77,77,75,81,70,77,74,87,65,77,90,69,76,84,90,81,74,90,72,78,72,74,73,81,75,75,88,83,84,77,67,84,76,85,83,70,86,75,76,82,77,66,86,88,90,88,80,85,90,85,71,70,88,69,82,76,88,82,74,65,82,77,70,73,67,82,78,68,89,78,79,78,80,79,66,79,79,82,75,73,89,88,75,69,87,72,85,81,71,76,66,68,70,67,84,89,83,83,70,85,88,87,72,89,89,81,90,70,73,80,72,79,89,86,73,67,89,65,73,73,72,66,72,77,76,66,66,73,84,75,80,86,88,82,70,68,84,85,86,68,86,70,76,71,70,86,70,82,75,89,83,81,75,75,69,86,71,65,81,72,81,76,73,75,78,66,77,...>>, error: {{badmatch,3}, [{rabbit_mqtt_frame,parse_frame,3,[{file,"src/rabbit_mqtt_frame.erl"},{line,108}]},{rabbit_mqtt_reader,parse,2,[{file,"src/rabbit_mqtt_reader.erl"},{line,327}]},{rabbit_mqtt_reader,process_received_bytes,2,[{file,"src/rabbit_mqtt_reader.erl"},{line,276}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1056}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,259}]}]}


I am running some load tests. This error shows up randomly. Initially it showed up 450 secondsv(~8 minutes) after the start of the test. Once I declared new queue then it showed up on new queue after running for 5920 seconds (~1hr 40min)

Additionally, the MQTT clients were publishing at 125 messages per second. There are other queues and consumers on the same cluster but none of the other consumers run into this issue.

List of plugins enabled:
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@ip
 |/
[  ] rabbitmq_amqp1_0                  3.7.15
[  ] rabbitmq_auth_backend_cache       3.7.15
[  ] rabbitmq_auth_backend_http        3.7.15
[  ] rabbitmq_auth_backend_ldap        3.7.15
[  ] rabbitmq_auth_mechanism_ssl       3.7.15
[  ] rabbitmq_consistent_hash_exchange 3.7.15
[  ] rabbitmq_event_exchange           3.7.15
[  ] rabbitmq_federation               3.7.15
[  ] rabbitmq_federation_management    3.7.15
[  ] rabbitmq_jms_topic_exchange       3.7.15
[E*] rabbitmq_management               3.7.15
[e*] rabbitmq_management_agent         3.7.15
[E*] rabbitmq_mqtt                     3.7.15
[  ] rabbitmq_peer_discovery_aws       3.7.15
[  ] rabbitmq_peer_discovery_common    3.7.15
[  ] rabbitmq_peer_discovery_consul    3.7.15
[  ] rabbitmq_peer_discovery_etcd      3.7.15
[  ] rabbitmq_peer_discovery_k8s       3.7.15
[  ] rabbitmq_random_exchange          3.7.15
[  ] rabbitmq_recent_history_exchange  3.7.15
[  ] rabbitmq_sharding                 3.7.15
[  ] rabbitmq_shovel                   3.7.15
[  ] rabbitmq_shovel_management        3.7.15
[  ] rabbitmq_stomp                    3.7.15
[  ] rabbitmq_top                      3.7.15
[  ] rabbitmq_tracing                  3.7.15
[  ] rabbitmq_trust_store              3.7.15
[e*] rabbitmq_web_dispatch             3.7.15
[  ] rabbitmq_web_mqtt                 3.7.15
[  ] rabbitmq_web_mqtt_examples        3.7.15
[  ] rabbitmq_web_stomp                3.7.15
[  ] rabbitmq_web_stomp_examples       3.7.15

> To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

Arnaud Cogoluègnes

unread,
Jul 3, 2019, 11:40:06 AM7/3/19
to rabbitm...@googlegroups.com
Java client 3.6.5 is 3-year-old, you should upgrade to 5.7.2 if you can use Java 8. Note upgrading to 5.x has some (minor) breaking changes [1].

If you're stuck with Java 6 or Java 7, please upgrade to Java client 4.11.2.

There have been several connection recovery fixes and improvements in the last 3 years.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.

Narendra Sharma

unread,
Jul 3, 2019, 9:11:39 PM7/3/19
to rabbitmq-users
Tried with new version (5.7.2) of the rabbtimq java client and it didn't help.

Found the root cause. The setup involves MQTT test clients publishing messages over MQTT to broker whereas the server consumes messages from broker using AMQP. MQTT allows topics of length upto 65535. AMQP routing key is limited to 255 bytes. One of the MQTT message had topic length greater than 255 bytes. This choked the AMQP queue and prevented any consumers from consuming form the queue.

The mistake I did was that I let test scripts use guest user to publish MQTT messages and somehow a message with very long topic got in. I plan to address the issue by setting strict topic permissions so that test script cannot publish to arbitrary long topics.

Arnaud Cogoluègnes

unread,
Jul 4, 2019, 3:38:20 AM7/4/19
to rabbitm...@googlegroups.com
Glad you found the issue, thanks for reporting back to the list.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages