Options for Horizontal Scaling a Single Queue

1,883 views
Skip to first unread message

Phill Tomlinson

unread,
Mar 23, 2015, 4:35:59 AM3/23/15
to rabbitm...@googlegroups.com
Hi,

I have been investigating various message queue technologies and wondered how the scale a single queue in RabbitMQ. As a live queue in Rabbit can only reside on a single node I've found there is the following options:

Automatically Shard the queue. There is a plugin 

However when testing this with 20 shards it didn't appear to distribute the data well, unlike kafka for example.

Manually shard the queue. So if I had a queue named PERSIST then I could create multiple queues, PERSIST_1, PERSIST_2, PERSIST_N etc. My application code would then need to utilize the queues in an even manner. Each queue could be put on a different broker.


Those are the only options I have seen.

Does anyone else have any other suggestions?

Regards,
Phill





Alvaro Videla

unread,
Mar 23, 2015, 4:37:33 AM3/23/15
to Phill Tomlinson, rabbitm...@googlegroups.com
Hi,

On Mon, Mar 23, 2015 at 9:36 AM Phill Tomlinson <p.g.to...@gmail.com> wrote:
However when testing this with 20 shards it didn't appear to distribute the data well, unlike kafka for example.


 What Exchange type did you use here? Did you use consistent hashing or modulo hashing?

Regards,

Alvaro

Phill Tomlinson

unread,
Mar 23, 2015, 4:44:53 AM3/23/15
to rabbitm...@googlegroups.com, p.g.to...@gmail.com
It would be the consistent hashing exchange, the default one that comes with Rabbit.

I tried with only 2 shards and it seemed to put messages on both shards, but when increasing this it seemed to only write to one. I tried 50 producer threads at one point. Presume if I use the Random Exchange it should distribute among the shards better?

Thanks,
Phill

Alvaro Videla

unread,
Mar 23, 2015, 4:54:08 AM3/23/15
to Phill Tomlinson, rabbitm...@googlegroups.com
I think to get optimal data partitioning with the Consistent Hashing Exchange you need to choose the proper routing key and binding key.

I think it's better or at least easier to try this with the modulo hashing exchange provided with the sharding plugin. 

AFAIK, consistent hashing is good if you need to perform re-balancing, for example to avoid cache misses in something like Memcache when adding/removing servers, something this plugin doesn't do.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Phill Tomlinson

unread,
Mar 23, 2015, 5:44:42 AM3/23/15
to rabbitm...@googlegroups.com, p.g.to...@gmail.com
Thanks for the information. I've just rechecked on my cluster and I am using the "x-modulus-hash" exchange, which I presume is what is mentioned below. When looking at the shards all 136K messages appear on a single shard with the other 20 empty.

All messages are very similar but have a counter in them which gives them uniqueness, so not sure why they are not getting routed to other shards.

Alvaro Videla

unread,
Mar 23, 2015, 6:43:14 AM3/23/15
to Phill Tomlinson, rabbitm...@googlegroups.com
Are you sending different routing keys with each message? The hashing logic depends on you sending different routing keys: https://github.com/rabbitmq/rabbitmq-sharding/blob/master/src/rabbit_sharding_exchange_type_modulus_hash.erl#L64

From the README: "The "x-modulus-hash" exchange will hash the routing key used to publish the message and then it will apply a Hash mod N to pick the queue where to route the message, where N is the number of queues bound to the exchange."

Can you share a list of bindings to your sharded exchange?

Also could you share the code used for publishing?

Phill Tomlinson

unread,
Mar 23, 2015, 7:08:30 AM3/23/15
to rabbitm...@googlegroups.com, p.g.to...@gmail.com
Thanks.

I have the following policy on the exchange:

Pattern^ccs-exchange-shards.*
Apply toall
Definition
routing-key:rabbit-test-shard-queue
shards-per-node:20
Priority0

Its currently in a cluster but all the bindings have the following format:

To
Routing Key

sharding: ccs-exchange-shards - rabbit@rabbit1_1 - 0
rabbit-test-shard-queue

sharding: ccs-exchange-shards - rabbit@rabbit1_1 - 1
rabbit-test-shard-queue

sharding: ccs-exchange-shards - rabbit@rabbit1_1 - 10
rabbit-test-shard-queue

Where the routing key is always "rabbit-test-shard-queue".

To publish I call:

channel.basicPublish(CCS_EXCHANGE, routingKey, props,
                        message.getBytes(MESSAGE_ENCODING));

Where routing key is "rabbit-test-shard-queue"


Do I need to create multiple polices to shard the routing key differently? I still want a single queue that producers and consumers can read from and not have to worry about the sharded queue names.

Phill

Alvaro Videla

unread,
Mar 23, 2015, 7:13:02 AM3/23/15
to Phill Tomlinson, rabbitm...@googlegroups.com
Hi,

On Mon, Mar 23, 2015 at 12:08 PM, Phill Tomlinson <p.g.to...@gmail.com> wrote:
channel.basicPublish(CCS_EXCHANGE, routingKey, props,
                        message.getBytes(MESSAGE_ENCODING));

Where routing key is "rabbit-test-shard-queue"

As explained in the previous email, you need to use a different routing key per message, probably some random ID or similar.

Phill Tomlinson

unread,
Mar 23, 2015, 7:34:19 AM3/23/15
to rabbitm...@googlegroups.com, p.g.to...@gmail.com
Thanks for your help - I used a UUID and that works now.

A couple more questions if you don't mind.

  1. This only scales the consumers. The producers are still writing to a single queue, the exchange then shards among some queues.
  2. There needs to be only a single queue per exchange, as if the routing key is random it could end up in any queue in the exchange.
Thanks once more.

Phill

Phill Tomlinson

unread,
Mar 23, 2015, 8:03:58 AM3/23/15
to rabbitm...@googlegroups.com, p.g.to...@gmail.com
Got another slight issue now, the messages are all on the queues I'm having issues getting them off with the consumers. I've tried more consumer threads then there are queues and it doesn't appear to read in a consistent manner.

channel.basicConsume(queue, autoAck, consumer);

where "queue" is the name of my exchange.

The queues are sharded across multiple nodes as I have a cluster. It originally just read from one node, but now when I restart the consumers now and put some more messages on the queues it no longer reads from any of them on any nodes (or sometimes a single shard).

Phill

Alvaro Videla

unread,
Mar 23, 2015, 8:40:18 AM3/23/15
to Phill Tomlinson, rabbitm...@googlegroups.com
Hi,

1. Publishers in AMQP send messages to exchanges. The "sharding" exchange here acts as a message partitioner as explained with the graphic in the plugin README. When using the sharding exchange there's no concept of a "single queue". Also you don't need to declare any queues, they are declared automatically by the plugin.
2. I don't understand what's the question here.

About the problem with not being able to pick all the messages by consumers. You probably need to let RabbitMQ and the plugin catchup the several basicConsume commands from the threads. The plugin tries to subscribe your consumer to the shard with the less consumers, but if the basicConsume are received concurrently, then it might be too fast, and therefore there won't be enough time internally for queues to "know" about the new consumers so they might return old stats to the plugin. 

About different nodes. The plugin only subscribes consumers to local queues, therefore you might need to load balance consumers across nodes either in your app code, or using a load balancer.

Regards,

Alvaro


--

Phill Tomlinson

unread,
Mar 23, 2015, 9:46:10 AM3/23/15
to rabbitm...@googlegroups.com, p.g.to...@gmail.com
Thanks. All correct again. The consumers must have been starting up too fast so put a sleep in there to test this and seems to round robin among the shards now.

I am using a load balancer but I would need the application code to connect correct numbers of consumers to the relevant brokers.

Thanks,
Phill
Reply all
Reply to author
Forward
0 new messages