RabbitMQ 3.5's Priority Queues and C#

1,642 views
Skip to first unread message

Tim Gumbley

unread,
Mar 26, 2015, 2:55:16 PM3/26/15
to rabbitm...@googlegroups.com

RabbitMQ 3.5.0.0 now supports Priority Queues; However, I am unable to build a working example. I've placed my code below. It includes the output that I expect and the output I actually. I'd be interested in more documentation, and/or a working example in C#.

So in short: Do priority queues work in C# with Rabbit 3.5.0.0? (I found an example in Java here)

Publisher:

using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;

class Publisher
{

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                IDictionary <String , Object> args = new Dictionary<String,Object>() ;
                args.Add(" x-max-priority ", 10);
                channel.QueueDeclare("task_queue1", true, false, true, args);

                for (int i = 1 ; i<=10; i++ )
                {
                    var message = "Message";
                    var body = Encoding.UTF8.GetBytes(message + " " + i);
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.Priority = Convert.ToByte(i);
                    channel.BasicPublish("", "task_queue1", properties, body);
                }
            }
        }
    }
}

Consumer:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Consumer
{ 
    class Worker
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    IDictionary<String, Object> args = new Dictionary<String, Object>();                      
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
                    channel.BasicConsume( "task_queue1", false, "", args, consumer);
                    Console.WriteLine(" [*] Waiting for messages. " +
                                      "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }
}

Actual output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10

Expected output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1


Michael Klishin

unread,
Mar 26, 2015, 3:03:17 PM3/26/15
to rabbitm...@googlegroups.com, Tim Gumbley
 On 26 March 2015 at 21:55:19, Tim Gumbley (tgum...@gmail.com) wrote:
> RabbitMQ 3.5.0.0 now supports Priority Queues(https://www.rabbitmq.com/priority.html);
> However, I am unable to build a working example. I've placed my
> code below. It includes the output that I expect and the output
> I actually.

See "Interaction with Consumers" in the doc guide.

If messages are sent directly to consumers and never really reach message store, they won't be prioritised.

So, for you example you'd want to publish a few thousand (to give a more descriptive example)
messages with a random priority each first, then start your consumer.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


Tim Gumbley

unread,
Mar 26, 2015, 4:41:35 PM3/26/15
to rabbitm...@googlegroups.com, tgum...@gmail.com
Hello MK, I stumbled across that doc already. If you publish a large number of messages to the queue and only start the consumer after the messages are there the behavoir is the same. Priority is still ignored.

Here's some code that puts 1,000,000 messages into the queue. Make sure to start the consumer after all 1,000,000 are there.

using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;

class Publisher
{

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                IDictionary <String , Object> args = new Dictionary<String,Object>() ;
                args.Add(" x-max-priority ", 10);
                channel.QueueDeclare("task_queue1", true, false, true, args);


                for (int i = 1 ; i<=1000000; i++ )

                {
                    var message = "Message";
                    var body = Encoding.UTF8.GetBytes(message + " " + i);
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);

                    properties.Priority = Convert.ToByte(i%10);

                    channel.BasicPublish("", "task_queue1", properties, body);
                }
            }
        }
    }
}
Actual output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
.
.
.
[x] Received Message 1000000

Expected Out Come
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 9
[x] Received Message 19
[x] Received Message 29
.
.
.
[x] Received Message 999999
[x] Received Message 8
[x] Received Message 18
[x] Received Message 28
.
.
.
[x] Received Message 999998
[x] Received Message 7
[x] Received Message 17
[x] Received Message 27
.
.
.
[x] Received Message 999997
.
.
.




Am I missing something?

 
Tim

Michael Klishin

unread,
Mar 26, 2015, 5:20:19 PM3/26/15
to rabbitm...@googlegroups.com, Tim Gumbley
On 26 March 2015 at 23:41:40, Tim Gumbley (tgum...@gmail.com) wrote:
> [*] Waiting for messages. To exit press CTRL+C [x] Received
> Message 1 [x] Received Message 2 [x] Received Message 3 [x] Received
> Message 4 [x] Received Message 5 [x] Received Message 6 [x] Received
> Message 7 [x] Received Message 8 [x] Received Message 9

I have

=> Message 9813 (priority: 4)
=> Message 9816 (priority: 4)
=> Message 9823 (priority: 4)
=> Message 9830 (priority: 4)
=> Message 9831 (priority: 4)
=> Message 9836 (priority: 4)
=> Message 9840 (priority: 4)
=> Message 9841 (priority: 4)
=> Message 9842 (priority: 4)
=> Message 9847 (priority: 4)
=> Message 9849 (priority: 4)
=> Message 9883 (priority: 4)
=> Message 9887 (priority: 4)
=> Message 9891 (priority: 4)
=> Message 9917 (priority: 4)
=> Message 9927 (priority: 4)
=> Message 9928 (priority: 4)
=> Message 9932 (priority: 4)
=> Message 9942 (priority: 4)
=> Message 9957 (priority: 4)
=> Message 9963 (priority: 4)
=> Message 9967 (priority: 4)
=> Message 9972 (priority: 4)
=> Message 9973 (priority: 4)
=> Message 9975 (priority: 4)
=> Message 9983 (priority: 4)
=> Message 9985 (priority: 4)
=> Message 22 (priority: 3)
=> Message 35 (priority: 3)
=> Message 37 (priority: 3)
=> Message 38 (priority: 3)
=> Message 43 (priority: 3)
=> Message 48 (priority: 3)
=> Message 52 (priority: 3)
=> Message 63 (priority: 3)
=> Message 68 (priority: 3)
=> Message 75 (priority: 3)
=> Message 115 (priority: 3)
=> Message 118 (priority: 3)
=> Message 129 (priority: 3)
=> Message 147 (priority: 3)
=> Message 201 (priority: 3)
=> Message 210 (priority: 3)
=> Message 237 (priority: 3)
=> Message 238 (priority: 3)
=> Message 246 (priority: 3)
=> Message 277 (priority: 3)
=> Message 282 (priority: 3)
=> Message 290 (priority: 3)
=> Message 301 (priority: 3)
=> Message 306 (priority: 3)
=> Message 309 (priority: 3)
=> Message 321 (priority: 3)
=> Message 323 (priority: 3)
=> Message 328 (priority: 3)
=> Message 346 (priority: 3)
=> Message 347 (priority: 3)

with multiple clients.

Trying .NET now.

Michael Klishin

unread,
Mar 26, 2015, 6:29:40 PM3/26/15
to rabbitm...@googlegroups.com, Tim Gumbley
 On 27 March 2015 at 00:20:16, Michael Klishin (mkli...@pivotal.io) wrote:
> Trying .NET now.

Publishing with the following snippet (only the relevant parts are included) from F# REPL:
https://gist.github.com/michaelklishin/0beb587eae77e7d664f6

produces the same behaviour: messages are delivered according to their priority when
I start a consumer.

So I'm not sure what may be different in your environment. The code looks OK.

Are you sure you have RabbitMQ 3.5.0 running locally?

Michael Klishin

unread,
Mar 26, 2015, 8:20:24 PM3/26/15
to rabbitm...@googlegroups.com, Tim Gumbley
On 27 March 2015 at 01:29:35, Michael Klishin (mkli...@pivotal.io) wrote:
> Publishing with the following snippet (only the relevant parts
> are included) from F# REPL:
> https://gist.github.com/michaelklishin/0beb587eae77e7d664f6
>
> produces the same behaviour: messages are delivered according
> to their priority when
> I start a consumer.

OK, two scripts and consumer output:
https://gist.github.com/michaelklishin/5500ef12b703e67f703b

We'll add some basic tests to the .NET client test suite just in case. 

Tim Gumbley

unread,
Mar 27, 2015, 10:11:32 AM3/27/15
to rabbitm...@googlegroups.com, tgum...@gmail.com
Hey! 

I solved it, everything's fine. It was a dumb error. I had a some spaces around the x-max-priority
I wrote
args.Add(" x-max-priority ", 10);
I should have wrote
args.Add("x-max-priority", 10);

I'm so sorry, and I feel like an idiot. Spent to much time pulling out my hair over this.

Tim

Michael Klishin

unread,
Mar 27, 2015, 10:18:08 AM3/27/15
to rabbitm...@googlegroups.com, Tim Gumbley
 On 27 March 2015 at 17:11:36, Tim Gumbley (tgum...@gmail.com) wrote:
> I solved it, everything's fine. It was a dumb error. I had a some
> spaces around the x-max-priority
> I wrote
> args.Add(" x-max-priority ", 10);
> I should have wrote
> args.Add("x-max-priority", 10);
>
> I'm so sorry, and I feel like an idiot. Spent to much time pulling
> out my hair over this.

This makes me wonder if we should introduce an enumeration for known headers. 
Message has been deleted
Message has been deleted

Aljohn V

unread,
Mar 19, 2018, 7:23:08 AM3/19/18
to rabbitmq-users
Hi Tim,
this line args.Add(" x-max-priority ", 10) will run
but can't priority messages
the other one gives me an error said: 
RabbitMQ.Client.Exceptions.OperationInterruptedException: 'The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text="PRECONDITION_FAILED - inequivalent arg 'x-max-priority' for queue 'Test' in vhost 'MyHost': received the value '8' of type 'signedint' but current is none", classId=50, methodId=10, cause='

Luke Bakken

unread,
Mar 19, 2018, 9:23:01 AM3/19/18
to rabbitmq-users
Hello,

Please post a new message to rabbitmq-users instead of replying to a three-year old thread. I will answer it when I see the new message.

Thanks!
Luke
Reply all
Reply to author
Forward
0 new messages