Consume queue with priority over other queues (consumer priority not working)

976 views
Skip to first unread message

Apodo Lias

unread,
Jun 1, 2018, 5:10:36 AM6/1/18
to rabbitmq-users
Hi,

I tried to use the consumer priority extension (https://www.rabbitmq.com/consumer-priority.html) but failed to get it working in my test.

What I'm trying to achieve is this:
I have tasks of different workloads (small = S, medium = M, large = L). These tasks are published as messages to a direct exchange with several queues. Each queue represents tasks of a specific workload size, so in this example there would be queues S, M and L. I have different consumers that are capable of handling different task sizes. A consumer should be able to process messages of its own size category or lower (but never higher), and it should prefer larger tasks over smaller tasks (so basically consume tasks of its own size with priority and only if there is nothing to do in this category check for smaller tasks, so that the consumer does not idle while there is still anything do to).

So I thought I could declare three queues QS, QM and QL and let the exchange route to these by corresponding routing key of the message.
I then have a consumer CL which subscribes to all three queues, but sets the x-priority argument to e.g. 10 when consuming QL, to 5 for QM and to 0 for QS.
Another consumer CM would then subscribe only to QM (with prio 10) and to QS (with prio 0), but not to QL.
I hoped that in this setup the CL would only read from QL as long as there are message in there and only consume from QM or QS if QL is empty. But that didn't work.

I wrote a simple test with only two queues and one consumer subscribed to both these queues:

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
System.out.println("In " + consumerTag + ": Received " + envelope + " -> " + new String(body, "UTF-8"));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};

channel.basicQos(1, false);

channel.basicConsume("S", false, "smallConsumer", consumer);
Map<String, Object> options = new HashMap<>();
options.put("x-priority", 10);
channel.basicConsume("L", false, "bigConsumer", false, false, options, consumer);

As you can see autoAck = false and QoS = 1.
If I now publish 3 messages for each of the corresponding queues before starting the consumer, I would expect that the resulting order of consumption would be: L, L, L, S, S, S. But it seems to just use a simple round-robin mode: S, L, S, L, S, L.

What am I doing wrong? Thanks for your help!

Kind regards,
Apodo

Michael Klishin

unread,
Jun 1, 2018, 7:14:05 AM6/1/18
to rabbitm...@googlegroups.com
Consumer priorities are effective at the queue level (or channel/queue combination, depending on who you ask).
In other words consumers on queue A are prioritised between each other, not
consumers on queue B. There is no way for a consumer on A to "slow down" a consumer on B.

Priority queues is what you are looking for.
While using a single queue should be closer to what you want, it is not a topology RabbitMQ was designed for [2],
so finding a reasonable way for your consumers to work in parallel is a much better idea.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Apodo Lias

unread,
Jun 1, 2018, 8:37:26 AM6/1/18
to rabbitmq-users
Thanks for your reply!

On Friday, 1 June 2018 13:14:05 UTC+2, Michael Klishin wrote:
Consumer priorities are effective at the queue level (or channel/queue combination, depending on who you ask).
In other words consumers on queue A are prioritised between each other, not
consumers on queue B. There is no way for a consumer on A to "slow down" a consumer on B.

I don't want to slow down a consumer on B, I just want to give priority to one consumer over the other.
So if I understand you correctly, when I have two consumers (1 and 2) each subscribed to two queues (A and B), and 1 has higher prio than 2 on A, and 2 has higher prio than 1 on B, then when both queues have pending messages what would happen?
With QoS = 1 per consumer I assume if both 1 and 2 are free, 1 would grab the next message from A before 2. But then 1 is blocked for the time of processing (until ack) and 2 would grab a message from A. And the next messages would then be taken from B (so again round robin between queues), right? So it seems this feature only makes sense if multiple consumers are subscribed to one queue and the higher ranking consumer is actually faster than the lower ones (otherwise the lower ones would still regularly "steal" messages from the priority consumer while it is blocked) or the speed of publishing is slower than the consumption.
So that does not seem to solve my issue.


Priority queues is what you are looking for

But if I understand priority queues (that's basically message priority, right?) correctly, then this would just change the order in which messages are consumed. So in my case if I assign all the L messages a higher prio than M, then all my Ls will get consumed before M (if QoS is small enough, I have explicit ack...). That's again not what I try to achieve. I still want the messages to be consumed in the order in which they were published.
And I assume priority queues again affect only the priority of message within one queue. That means if I have a consumer subscribed to two queues, and in one of these all messages have prio 10 and in the other all have prio 0, then the consumer would still consumer messages from both queues in turn (round robin), right?

I could just have three queues and one consumer for each queue, but the disadvantage (that I want to avoid) is that when one queue is empty, the respective consumer will idle. I'd rather have this consumer then take messages from another queue until there is again pressure on its own queue. So I basically look for a way to change the round robin approach of consuming messages, if a single consumer is subscribed to multiple queues.

  
so finding a reasonable way for your consumers to work in parallel is a much better idea.

What do you mean by that? I want my consumers to work in parallel, but I want them to work on different things. In the long run I want to achieve some kind of fairness.
The real world example would be the check-in at an airport: there are three check-in counters, one for economy, one for business and one for first class. Each counter servers only their respective customers, but when for example the first class queue is empty (because there are a lot less first class seats on a plane than economy) then the clerk will ask the head of the business (or if also empty the economy) line to come over and do the check in there. If after that a first class customer has arrived, they will be next, else another businss or economy customer is served. And all this while the clerk of the economy counter handles the economy queue in parallel.

Michael Klishin

unread,
Jun 1, 2018, 8:47:32 AM6/1/18
to rabbitm...@googlegroups.com
Since queues A and B perform deliveries independently, they are not aware of each other. Reusing a consumer across two queues with different priorities will make things harder to reason about. And if you also reuse a channel then there will also be a conflict of two priority definitions. I haven’t worked in that area for a while but I’m pretty sure consumer priorities are a part of channel state, like most things related to consumers.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

Apodo Lias

unread,
Jun 1, 2018, 10:40:38 AM6/1/18
to rabbitmq-users
I had assumed that there is some kind of interdependency between queues, at least if consumed over the same channel. As you describe it, it sounds as if every queue just pushes messages to its consumers without knowledge of other queues. But my observations seem to differ:
I declared two queues (A and B) and consume them with the same single consumer over the same channel. When I publish messages in the order A, A, A, A, A, B, B, B, B and the queues were independent of each other, then I would expect the consumer (again QoS=1) to receive these messages in that same order. But in reality the order in which they get consumed is A, B, A, B, A, B, A... For me that looks like there is something (the channel?) that evenly distributes messages across multiple queues (disregarding their publishing time) and thus is aware of the different queues. For that reason I had assumed that setting a consumer priority actually affects the consumption order of this consumer across all subscribed queues (at least within the same channel). At least that's what would make this feature really useful (and it would still preserve its existing behaviour if only one queue is consumed)...

Michael Klishin

unread,
Jun 1, 2018, 11:07:39 AM6/1/18
to rabbitm...@googlegroups.com
> it sounds as if every queue just pushes messages to its consumers without knowledge of other queues

that's the case. QoS can somewhat affect that since it can be per channel or per consumer, and channels can be reused,
and consumers also can be reused. But otherwise that's exactly how things work. Each queue "runs delivery loop" (in RabbitMQ internals speak)
independently from others.

Making some consumer priorities affecting deliveries to other consumers would be a dangerous feature and a complete hell to debug.


On Fri, Jun 1, 2018 at 5:40 PM, 'Apodo Lias' via rabbitmq-users <rabbitm...@googlegroups.com> wrote:
I had assumed that there is some kind of interdependency between queues, at least if consumed over the same channel. As you describe it, it sounds as if every queue just pushes messages to its consumers without knowledge of other queues. But my observations seem to differ:
I declared two queues (A and B) and consume them with the same single consumer over the same channel. When I publish messages in the order A, A, A, A, A, B, B, B, B and the queues were independent of each other, then I would expect the consumer (again QoS=1) to receive these messages in that same order. But in reality the order in which they get consumed is A, B, A, B, A, B, A... For me that looks like there is something (the channel?) that evenly distributes messages across multiple queues (disregarding their publishing time) and thus is aware of the different queues. For that reason I had assumed that setting a consumer priority actually affects the consumption order of this consumer across all subscribed queues (at least within the same channel). At least that's what would make this feature really useful (and it would still preserve its existing behaviour if only one queue is consumed)...

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages