EventingBasicConsumer - Blocking threads

1,176 views
Skip to first unread message

Tomáš Bednár

unread,
Jan 21, 2015, 7:43:29 AM1/21/15
to rabbitm...@googlegroups.com
Hi,
I'm working on C# application which uses .NET AMQP client library. I have one topic exchange called storage-input and two queues storage-external and storage-internal with routing keys ext.img and int.img.
What I need is consume messages in my application from both queues asynchronously. So I created one shared IConnection and two threads where each thread creates its own IModel from shared IConnection. Each thread than starts consuming messages using EventingBasicConsumer and Received event.

So I thought everything is OK, but then I publish message to both queues and check wheter consuming is truly asynchronous and it is not :( When I check thread ID in both consumers I see that it is consuming on same threat so it is blocking each other.

What I am doing wrong and is it even possible to consume queues asynchronously without blocking each other? Or do I have to create IConnection for each threat?

Thanks for help,
Tomáš

Tomáš Bednár

unread,
Jan 21, 2015, 9:09:17 AM1/21/15
to rabbitm...@googlegroups.com
I tried create new IConnection for each thread and it works asynchronously but I don't like it because of using TCP connection per consumer thread. Can I consider as a bug or it is right behavior?

Michael Klishin

unread,
Jan 21, 2015, 9:18:52 AM1/21/15
to rabbitm...@googlegroups.com, Tomáš Bednár
On 21 January 2015 at 17:09:18, Tomáš Bednár (bednar.t...@gmail.com) wrote:
> I tried create new IConnection for each thread and it works asynchronously
> but I don't like it because of using TCP connection per consumer
> thread. Can I consider as a bug or it is right behavior?

It's most common to create a new channel (IModel) for every thread but use a single connection.

In addition, deliveries and other server-sent methods  will be dispatched to a thread pool in a future version.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Tomáš Bednár

unread,
Jan 21, 2015, 9:58:55 AM1/21/15
to rabbitm...@googlegroups.com, bednar.t...@gmail.com
So there isn't other way for now if I need asynchronous consuming in my application other than create new IConnection for each thread?

Thanks,
Tomas
 

Michael Klishin

unread,
Jan 21, 2015, 10:02:48 AM1/21/15
to rabbitm...@googlegroups.com, Tomáš Bednár
On 21 January 2015 at 17:58:56, Tomáš Bednár (bednar.t...@gmail.com) wrote:
> So there isn't other way for now if I need asynchronous consuming
> in my application other than create new IConnection for each
> thread?

Deliveries are dispatched on the I/O thread, every connection has its own. Your consumer can dispatch
the deliveries however you want (e.g. by starting tasks, using a thread pool). Only certain consumer implementations
(e.g. the queueing one) really need a thread per connection at the moment.

In the Java client, all server-sent protocol methods are dispatched to a thread pool, although
nothing changes for queueing consumers even then. This is how .NET client will work one day, too.

Tomáš Bednár

unread,
Jan 21, 2015, 10:11:12 AM1/21/15
to rabbitm...@googlegroups.com, bednar.t...@gmail.com
On Wednesday, 21 January 2015 16:02:48 UTC+1, Michael Klishin wrote:
>Deliveries are dispatched on the I/O thread, every connection has its own. Your consumer can dispatch
>the deliveries however you want (e.g. by starting tasks, using a thread pool). Only certain consumer implementations
>(e.g. the queueing one) really need a thread per connection at the moment.

>In the Java client, all server-sent protocol methods are dispatched to a thread pool, although
>nothing changes for queueing consumers even then. This is how .NET client will work one day, too.

Thanks for explanation ;)

Tomas

Herbey Zepeda

unread,
Sep 27, 2017, 9:09:59 PM9/27/17
to rabbitmq-users
Hello Michael,

Here is a code snippet of a pattern I am evaluating. The parallel.Invoke can have N number of Tasks such that the decision as to how many threads should be created is delegated to the run time/thread pool. I would like to issue a blocking call service.LongTask(threadNameon each thread to take advantage of the auto-acknowledgements subsystem.

I also evaluated the pattern of having only one call to program.Consume while using a new (fire and forget) background thread from the thread pool to process the dequeued messages such as Task.Run(()=>service.LongTask(threadName)). In this case, if I am not mistaken, we would overwhelm the thread pool queues thus transferring the concern of queuing to another subsystem. In addition, if I follow this route, I would have to "ack" the messages prior to execution, thus losing RabbitMq's acking / re-queuing functionality.

I've done a few small scale performance tests and both scenarios seem to be similar. Before engaging in more serious testing, I would like to know if scenario 1 (Parallel.Invoke) is a reasonable approach based on my description. I am concerned about creating/using too many tasks (threads) which would be blocking (or listening via EventingBasicConsumer.Received) resources. I decided to ask this question here because in this conversation you brought about the topic of the I/O thread. My understanding is that while my "Listening" threads are not processing incoming messages from the queue, they are in sleep mode and the I/O thread (associated to the corresponding connection) is the only active resource handling the message deliveries pushed by the RabbitMq, thus waking these threads to start processing the incoming request. During sleep mode, the listening threads would not be blocking the thread pool thus allowing the latter to claim the resources for other purposes.


Main()
{
                   
var factory = new ConnectionFactory() { HostName = "localhost" };
                   
var rmqConnection = factory.CreateConnection();


                   
Parallel.Invoke(
                       
() => program.Consume(rmqConnection, "thread1"),
                       
() => program.Consume(rmqConnection, "thread2"),
                           
...


                       
() => program.Consume(rmqConnection, "thread20"));
}


Consume(IConnection connection, string threadName)
{
           
IModel rmqChannel;
           
EventingBasicConsumer consumer;


            rmqChannel
= rmqConnection.CreateModel();
            rmqChannel
.QueueDeclare(queue: "hello",
                                     durable
: false,
                                     exclusive
: false,
                                     autoDelete
: false,
                                     arguments
: null);


            rmqChannel
.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
            consumer
= new EventingBasicConsumer(rmqChannel);
            consumer
.Received += (model, ea) =>
           
{
               
var body = ea.Body;
               
var message = Encoding.UTF8.GetString(body);
               
var service = new MyService();
                service
.LongTask(threadName);
           
};
            rmqChannel
.BasicConsume(queue: "hello", autoAck: true,consumer: consumer);
}


Thank you
Reply all
Reply to author
Forward
0 new messages