Hi,
What I'm trying to achieve is this:
I have tasks of different workloads (small = S, medium = M, large = L). These tasks are published as messages to a direct exchange with several queues. Each queue represents tasks of a specific workload size, so in this example there would be queues S, M and L. I have different consumers that are capable of handling different task sizes. A consumer should be able to process messages of its own size category or lower (but never higher), and it should prefer larger tasks over smaller tasks (so basically consume tasks of its own size with priority and only if there is nothing to do in this category check for smaller tasks, so that the consumer does not idle while there is still anything do to).
So I thought I could declare three queues QS, QM and QL and let the exchange route to these by corresponding routing key of the message.
I then have a consumer CL which subscribes to all three queues, but sets the x-priority argument to e.g. 10 when consuming QL, to 5 for QM and to 0 for QS.
Another consumer CM would then subscribe only to QM (with prio 10) and to QS (with prio 0), but not to QL.
I hoped that in this setup the CL would only read from QL as long as there are message in there and only consume from QM or QS if QL is empty. But that didn't work.
I wrote a simple test with only two queues and one consumer subscribed to both these queues:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException {
System.out.println("In " + consumerTag + ": Received " + envelope + " -> " + new String(body, "UTF-8"));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(1, false);
channel.basicConsume("S", false, "smallConsumer", consumer);
Map<String, Object> options = new HashMap<>();
options.put("x-priority", 10);
channel.basicConsume("L", false, "bigConsumer", false, false, options, consumer);
As you can see autoAck = false and QoS = 1.
If I now publish 3 messages for each of the corresponding queues before starting the consumer, I would expect that the resulting order of consumption would be: L, L, L, S, S, S. But it seems to just use a simple round-robin mode: S, L, S, L, S, L.
What am I doing wrong? Thanks for your help!
Kind regards,
Apodo