Consistent-hash exchange, single active consumers and Spring AMQP

640 views
Skip to first unread message

Milan Milanov

unread,
Oct 28, 2022, 4:55:39 AM10/28/22
to rabbitmq-users
We're using RabbitMQ 3.10 with roughly the following topology -  services have a dedicated exchange of type x-consistent-hash. Each of these exchanges is bound to 20 statically defined queues. This ensures that each queue will maintain the order of the dependent messages in it.
Now on the consumer side we have multiple instances of each service. In order not to have parallel consuming of messages from the same queue, we've set up these 20 queues per service to have a "single active consumer". This is technically done using a DirectRabbitListenerContainerFactory with consumersPerQueue set to 1.

This setup achieves processing messages in their partial order, while having multiple consumers. However, there is a problem. When a service starts, the first instance that boots starts consuming all the 20 queues for said service. The other instances are only there as backup (because of the single active consumer setting). This limits the usefulness of the multiple instances and blocks horizontal scalability.

Any advice on how to achieve multi-instance in-order message processing would be appreciated. I was naively thinking of each new instance creating some number of queues dynamically and binding them to the consistent-hash exchange, but this means that if the service stops working for some reason these queues will become orphaned (not consumed by anyone) or could self-delete, and losing messages is not acceptable.

Gary Russell

unread,
Oct 31, 2022, 9:12:31 AM10/31/22
to rabbitm...@googlegroups.com
Please ask questions about Spring on Stack Overflow, not here.

The consumeDelay​ property might help; but it will slow down initialization because the delay is applied unconditionally between consuming from each queue. It will mean, for your last instance, it will be delayed 20x the delay.

/**

* Set the consumeDelay - a time to wait before consuming in ms. This is useful when

* using the sharding plugin with {@code concurrency > 1}, to avoid uneven distribution of

* consumers across the shards. See the plugin README for more information.

* @param consumeDelay the consume delay.

* @since 2.3

*/

public void setConsumeDelay(long consumeDelay) {

this.consumeDelay = consumeDelay;

}


We could consider improving the logic to not apply the delay if the previous attempt to consume fails.

Please open a new feature issue on GitHub [1] and we'll take a look.




From: rabbitm...@googlegroups.com <rabbitm...@googlegroups.com> on behalf of Milan Milanov <whizz...@gmail.com>
Sent: Friday, October 28, 2022 4:55 AM
To: rabbitmq-users <rabbitm...@googlegroups.com>
Subject: [rabbitmq-users] Consistent-hash exchange, single active consumers and Spring AMQP
 
!! External Email
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/91187562-609a-41e7-a715-c155e29cc4f4n%40googlegroups.com.

!! External Email: This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender.

Gary Russell

unread,
Oct 31, 2022, 1:51:29 PM10/31/22
to rabbitm...@googlegroups.com
On second thought, that won't help because the delay is applied before the first consume too, so the second instance won't get to the second queue before the first. I'll see if I can come up with a solution.

From: Gary Russell <grus...@vmware.com>
Sent: Monday, October 31, 2022 9:12 AM
To: rabbitm...@googlegroups.com <rabbitm...@googlegroups.com>
Subject: Re: [rabbitmq-users] Consistent-hash exchange, single active consumers and Spring AMQP
 

Gary Russell

unread,
Oct 31, 2022, 2:28:35 PM10/31/22
to rabbitm...@googlegroups.com
Unfortunately, there is no indication (over AMQP) that a consumer is active or waiting on a single active consumer queue; the only solution I have come up with is to add a consumeDelay​ and give each instance the queue names in a different order (each one having a different queue as the first), then make sure the instances are all started within the consumeDelay​.

Not ideal, but it should work.

From: Gary Russell <grus...@vmware.com>
Sent: Monday, October 31, 2022 1:51 PM

Milan Milanov

unread,
Nov 1, 2022, 4:21:22 AM11/1/22
to rabbitmq-users
My question is related more to the general purpose topology that we use, and if it can be tweaked to tackle the particular problem. The fact that we use Spring is (or should be) a detail. I've found the following resources which seem relevant:
https://helix.apache.org/1.0.2-docs/recipes/rabbitmq_consumer_group.html (explains how Helix + Zookeeper can be used to perform "rebalancing")
https://jack-vanlightly.com/blog/2018/11/14/why-i-am-not-a-fan-of-the-rabbitmq-sharding-plugin (explanation of the consistent-hash exchange + single active consumer shortcomings, plus an idea for a rebalancing plugin)
https://github.com/dradoaica/rebalancer-net (a general-purpose rebalancing library written in .net)
What I'm wondering is whether this problem is common and known in the RabbitMQ community, and if there are efforts to solve it on the broker side (like Kafka did, as before that they used Zookeeper as well), or an external solution exists.

As for the consumeDelay suggestion - it seems promising for our current situation, so thank you. Even though ultimately the queue distribution will still be undeterministic (and a single service might still take most or all queues depending on the startup timings), it should at least give us time to sort out a more solid solution.

Milan Milanov

unread,
Nov 6, 2022, 8:58:29 AM11/6/22
to rabbitmq-users
One solution which I'm thinking about relies on cancelling active consumers, eg demoting a consumer to a passive one. However, I can't find much detail on https://www.rabbitmq.com/consumers.html#single-active-consumer besides that a consumer "can be cancelled". Moreover, judging by the quote that "there is no indication (over AMQP) that a consumer is active or waiting on a single active consumer queue", I assume that this behaviour is not built-in. Do you think that it can be achieved using a RabbitMQ plugin? The RabbitMQ server knows the statuses of the consumers, so it should be able to balance them across several consuming services.

David Ansari

unread,
Mar 5, 2025, 12:51:37 PMMar 5
to rabbitmq-users
Reply all
Reply to author
Forward
0 new messages