Consumer async It does not work asynchronous .netCore

623 views
Skip to first unread message

radu...@connatix.com

unread,
Sep 24, 2018, 2:01:28 PM9/24/18
to rabbitmq-users

Rabbitmq server version rabbitmq-server-3.7.8
Erlang: OTP 21.0
Os: Windows 10 Pro

Let's say I have a channel that it accepts maximum 10 items. Then I want to have work done in the received to be run asynchronous.
My first sample of code was:
`var factory = new ConnectionFactory() { HostName = "localhost"};
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello");

            var consumer = new  EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(1000); //simulate delay
            };
            channel.BasicQos(0, 10, false);
            channel.BasicConsume("hello", false, consumer);
        }`

I have seen that it takes it in sequence waiting 1 second.

After looking on the web I have upgrade the solution to
`var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true};
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello");

            var consumer = new  AsyncEventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine(" [x] Received {0}", message);
                await Task.Delay(1000); // simulate delay
            };
            channel.BasicQos(0, 10, false);
            channel.BasicConsume("hello", false, consumer);
        }`

I have expected to see let's say 10 message pop up on the console, but instead I have got the same behavior as in the first synchronous approach, waiting 1 second between messages.

When I put a break point inside the delegate I could see that it wait to the delegate to finish before another message is process.

Is this he desired behavior? If yes, what is the differences between sync and async versions?

Karl Nilsson

unread,
Sep 24, 2018, 3:12:33 PM9/24/18
to rabbitm...@googlegroups.com
Hi,

I think you are confusing asynchrony with concurrency. In practise all the async consumer provides you with is an async context to perform further async (i.e non-blocking) actions within. The dispatcher will still dispatch messages one by one. If you want concurrency you need to hop of the dispatching thread in some way and not await the result within the event handle body. 

Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Sep 24, 2018, 8:01:04 PM9/24/18
to rabbitm...@googlegroups.com
Also relevant here: concurrency can be intentionally controlled/limited for consumers that use
manual acknowledgement mode [1]. That's a separate concern from consumer operation dispatch
concurrency or concurrency that different Consumer interface implementation might apply
but still affect "effective concurrency" for the lack of a better term.

--
MK

Staff Software Engineer, Pivotal/RabbitMQ
Reply all
Reply to author
Forward
0 new messages