Balancing the single active consumers of super stream partitions

858 views
Skip to first unread message

Milan Milanov

unread,
Jun 19, 2023, 3:57:21 AM6/19/23
to rabbitmq-users
Reading up on https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/, it's unclear to me how are the different partitions of a super stream split between all the single-active consumers?
In the example the invoices super stream has three partitions. If one service (consumer) starts, they will be the single active consumer on all three. What will happen if two other consumers join? Will the partitions be distributed amongst them (relatively equally), so that in this case each consumers gets one partition (as in the diagram)?

The root of my question is a problem that we have with our current topology. I've outlined it in https://groups.google.com/g/rabbitmq-users/c/v5BsdWd9AlQ, but it's roughly - services have a dedicated exchange of type x-consistent-hash. Each of these exchanges is bound to 20 statically defined queues (partitions). This ensures that each queue will maintain the order of the dependent messages in it. Consumers are set as single active. The setup is very close to the super streams/partitions use case. However, it has the drawback that once one consumer is started they get to be the single one on all queues, leaving the others starving. Depending on the answer to the balancing question, if that happens automatically, would there be any way to replicate that behaviour in the consistent hash exchange/single active consumer setup? Otherwise we'll be forced to use super streams with a short data retention, but that defeats all benefits of the persistent, append only streams.

kjnilsson

unread,
Jun 19, 2023, 4:23:52 AM6/19/23
to rabbitmq-users
There is no such balancing for queue consumers.

There are a few things you could try.

To handle the app startup issue - each of your consuming apps can be assigned a "home" queue and introduce a delay when setting up the consumers for any non-home queues . This should allow other apps a chance to consumer from their home queues before anyone else.

You can also introduce some logic in the apps that periodically unsubscribes / subscribes from any queues that isn't the home queue. This will put them at the back of the SAC queue again.

Alt use an external coordinator to assign consumers to queues.

Cheers
Karl

Milan Milanov

unread,
Jun 19, 2023, 5:02:42 AM6/19/23
to rabbitmq-users
"There is no such balancing for queue consumers" - this applies for both super stream consumers and SAC/consistent-hash-exchange, i assume?  If that is the case, two followup questions:
- Is this functionality discussed/planned? It seems it will be useful in the context of super streams and their goal of scaling out by partitioning a large stream into smaller streams.
- Alternatively, how feasible would it be for a plugin to do the single active consumers balancing? The RabbitMQ server knows the state of the consumers, so it can perhaps cancel some of the active consumers in order for the the SAC fallback to kick in. Ideally if the server can choose which passive consumer becomes the active one, then even equal-ish distribution should be achievable. Even if not perfectly implemented, the SAC behaviour will ensure that no partitions are unread.

kjnilsson

unread,
Jun 19, 2023, 6:26:05 AM6/19/23
to rabbitmq-users
By queue consumers I meant consumers using AMQP consuming from any of the queue types. With super streams using a stream client it will rebalance the consumer roughly equally.

Implementing the same for queue consumers isn't feasible in the near term at least. How a stream consumer works is very different to how an AMQP consumer works and there is no protocol support for this behaviour like there is in the stream protocol.

That said one idea we have is that we could have SAC consumers honour the consumer priorities such that you could use the approach I outlined before but instead of bouncing consumers you just assign a higher consumer priority to the "home" queue consumer then the queue itself could make sure that the consumer with the highest priority becomes the active one.

Milan Milanov

unread,
Jun 19, 2023, 8:27:33 AM6/19/23
to rabbitmq-users
Thanks for the quick and insightful responses.
One problem with the "home" queue approach that you've outlined is that consumers need to be "different" from one another, eg they need to be stateful and a new one can't simply be spun up. It has to have custom home queues/priority set. Then upon redeploys we need to make sure that the invariant with the home queues/priorities remains set.

What about the other two approaches:
- a custom plugin to try and balance the consumers in the RabbitMQ server
- instead of using SAC/consistent-hash-exchange use super streams with a low retention period, ie using streams as pass-through queues without the persistence feature

Would any of these have significant drawbacks? Otherwise we can always use an external coordinator, but that introduces a lot of complexity that would otherwise be handled by the broker infrastructure itself.

kjnilsson

unread,
Jun 19, 2023, 9:29:32 AM6/19/23
to rabbitmq-users
If independent configuration isn't possible (surely that's a solved problem?) then you could fall back to a random or hash based approach. You take the list of queues you want to subscribe to and you rotate it a random number of times then assign priorities in descending order. Yes you may get collisions but it would be better than what you have. 

kjnilsson

unread,
Jun 19, 2023, 9:31:08 AM6/19/23
to rabbitmq-users
You are of course welcome to develop a custom plugin for anything that you need. bear in mind we don't guarantee to not break internal APIs between versions.

Milan Milanov

unread,
Jun 19, 2023, 11:11:41 AM6/19/23
to rabbitmq-users
Regarding the plugin solution - my questions was whether it actually is feasible/practical. On a high level it will need APIs to get all consumers for a queue, get the channel of a consumer (for grouping by service) and either make a consumer the active SAC or remove the active SAC.
In any case, there appear to be options, and I also got my initial question (balancing of SACs) answered. Thanks again.

Nicolas Piguet

unread,
Jul 3, 2023, 6:03:29 AM7/3/23
to rabbitmq-users
I stumbled on this conversation while searching for the answer to the exact same question as Milan.

I just wanted to add my voice to this discussion, because this is directly relevant to us and the fact that the balancing of SAC is not done by the RMQ server is the only thing that prevents us from migrating to super-streams.

In my organization, we started implementing an architecture that uses an application-level partitioned stream from the start (since RMQ 3.9, I think. So before super-streams existed). The approach we used is basically the same as super streams: Client decide which of the partitions they publish to using a hashed routing key.

Our application is deployed on a k8s cluster. For many reasons, it is *much* easier to have the exact same configuration on all nodes/pods of the application. This means that we've had to implement the SAC on the application side. We rely on Akka cluster's ShardedDaemonProcess mechanism to do the load balancing by assign "shards" to each application instance and re-assigning them as nodes go down and come back up. While that solves our problem, it is somewhat complex and we'd be happier if we could do without it.

In my understanding, the only reason to use a super-stream together with SAC is to perform load balancing between multiple consumer. Having one consumer handling all the load while the others are idle renders the whole mechanism completely pointless. You mention that this could be solved by having different configuration on each consumer, but this is not really a good solution because:

1. It is not generally desirable to have to manage different configurations for each nodes (not saying it can't be done, just saying it's needlessly complex)
2. It requires applications to know the topology the super-stream (how many partitions, what are there names, etc...) to be able do the configuration properly, which is *precisely* what super-streams is supposed to hide.
3. When nodes start going up or down (for example during application rolling upgrades), it is likely that we end up with balanced consumers.
4. It still requires all sorts of mechanisms to make sure that the balancing is robust against minor disruptions like random disconnections.

As a result, in our case, there is no reason to switch to super-streams. It doesn't make our implementation any easier on the consumer side, and it's only marginally easier on the producer side.

So while I understand that broker-side balanced partition assignment is not a feature that is available yet, its absence also greatly reduces the usefulness of the feature.

Nicolas Piguet

Karl Nilsson

unread,
Jul 3, 2023, 6:24:16 AM7/3/23
to rabbitm...@googlegroups.com
Nicolas,

Did you read my reply which reasonably clearly stated that if you use the stream protocol with super streams the broker _will_ rebalance the consumers over the partitions (when using a super stream compatible stream client library)? No independent configuration or third party coordinators needed. 

So in your case you can probably remove the use of Akka's ShardedDaemonProcess (whatever that is) by using super streams instead.

It is only when you use another protocol (such as AMQP) _or_ standard queues (classic, quorum) where there is no "consumer group" concept that the broker doesn't automatically rebalance consumers. Partly because there is no group concept and partly because to do such rebalancing well you kindof need protocol support for it.

Why don't you give super streams a go on a local dev box to see how it works when consumers come and go?

Cheers
Karl


--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/RRt7jE3UuUI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/9a19238d-7084-4161-8969-c5f3a764b10dn%40googlegroups.com.


--
Karl Nilsson

Nicolas Piguet

unread,
Jul 3, 2023, 10:34:21 AM7/3/23
to rabbitmq-users
My bad, I completely missed that in your post. I will definitely try it out then.

kjnilsson

unread,
Jul 3, 2023, 10:36:34 AM7/3/23
to rabbitmq-users
Excellent! let us know how you get on

Ryan Riley

unread,
Mar 15, 2025, 11:54:42 AM3/15/25
to rabbitmq-users
I see this is an old thread but my question is - is there any documentation about how a broker balances Super Stream consumer when they are using a stream client? Is it connection/channel based, or is it traffic based? 

An example:

Say I have many partitions (like 40), and distribution of messages is not perfectly uniform. I have 8 application instances (so 5 active consumers each). Is it possible that a single application gets assigned the top 5 highest traffic partitions, and so runs hotter than the other application instances? Or will the broker balances Super Stream by traffic/workload as well?

Arnaud Cogoluègnes

unread,
Mar 17, 2025, 10:12:01 AM3/17/25
to rabbitmq-users
The broker balances the consumers every time a consumer joins or leaves the group.

The decision to pick the active consumer for a partition is not based on the traffic/workload, it is based on the number of consumers (active consumer index = partition index % consumer count, e.g. for a 3-partition super stream with 3 consumer application instances, for the second partition [partition index = 1], the active consumer index is 1 % 3 = 1, that is the consumer of the application that started in second place [consumer/app index = 1]).

Note the balancing mechanism is per-partition, the broker does not do any synchronization between the consumers of different partitions.
Reply all
Reply to author
Forward
0 new messages