How to keep EventingBasicConsumer alive

2,156 views
Skip to first unread message

Noel McGrath

unread,
Aug 19, 2017, 2:40:38 AM8/19/17
to rabbitmq-users
I have searched but cant seem to find any example of keeping EventingBasicConsumer alive in an application.
All the examples are using just Console apps with Console.Readline to keep alive.

We had been using the QueueingBasicConsumer and with that we had a while loop to keep alive.
Which is what the examples in the tutorials were doing.
We are now upgrading the rabbitmq client c# library, and with that changing to use EventingBasicConsumer.

We have created a Task where we create the  EventingBasicConsumer and wire up the handler for receiving messages.
This way we are thread save and can new up more Tasks if required to have more consumers if required.
In order to keep the task alive we have created a while loop with a sleep.
My question is is this pattern the right thing to do or is there a better way?

See code example:


public class Consumer
{
 
private readonly IModel c_channel;
 
private readonly CancellationToken c_cancellationToken;


 
private EventingBasicConsumer c_consumer;


 
public Consumer(
 
IConnection connection,
 
CancellationToken cancellationToken)
 
{
 
this.c_cancellationToken = cancellationToken;
 
this.c_channel = connection.CreateModel();
 
this.c_channel.BasicQos(0, 1, false);
 
this.c_channel.QueueDeclare(
 queue
: "hello",
 durable
: false,
 exclusive
: false,
 autoDelete
: false,
 arguments
: null);
 
}




 
public Task Start()
 
{
 
var _result = new Task(
 
() =>
 
{
 
this.c_consumer = new EventingBasicConsumer(this.c_channel);
 
this.c_channel.BasicConsume(
 queue
: "hello",
 autoAck
: true,
 consumer
: this.c_consumer);
 
this.RunConsumeLoop();


 
},
 
this.c_cancellationToken,
 
TaskCreationOptions.LongRunning);
 _result
.Start();
 
return _result;
 
}


 
private void RunConsumeLoop()
 
{
 
this.c_consumer.Received += (model, basicDeliverEventArgs) =>
 
{
 
try
 
{
 
//Process the message, deserialize to type, do work ..
 
// If we get here send ack
 
this.c_channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
 
}
 
catch (Exception)
 
{
 
this.c_channel.BasicNack(basicDeliverEventArgs.DeliveryTag, false, false);
 
}
 
};


 
while (!this.c_cancellationToken.IsCancellationRequested)
 
{
 
Thread.Sleep(100);
 
}
 
}
}

Luke Bakken

unread,
Aug 19, 2017, 11:16:09 AM8/19/17
to rabbitmq-users
Hi Noel,

The typical use-case for EventingBasicConsumer is within a long-running application such as an ASP.NET web application or Windows service, where you start the EventingBasicConsumer in the initialization phase of your application and then handle AMQP events as they come in. There is no need to run instances of this class in dedicated threads.

If you're not using a long-running container like IIS, an option would be to encapsulate each instance of EventingBasicConsumer in a class that also contains an instance of EventWaitHandle. Then, when all of your class instances are created, you can use WaitAll on all of the created handles (each class instance would have a getter for this object). It would then be the responsibility of each class instance to signal their EventWaitHandle when a "stop" or "cancel" event is processed. Once all EventWaitHandles are signaled WaitAll will return and your application can proceed to exit, if that is what you want.

Let me know if you have any questions -
Luke

Noel McGrath

unread,
Aug 21, 2017, 9:47:54 AM8/21/17
to rabbitmq-users
thanks Luke,
I have it working with  the consumer initialized at application start

Mohamed Fayeeg

unread,
Dec 9, 2017, 7:46:57 AM12/9/17
to rabbitmq-users
Hi Luke,

I understand from Noel's response that your solution is indeed working. I appreciate if you could give me code blocks demonstrating your below given suggestion.

Thanks.

regards,
Fayeeg

Luke Bakken

unread,
Dec 9, 2017, 12:48:59 PM12/9/17
to rabbitmq-users
Hello -

Actually I gave two suggestions - one is to start your RabbitMQ consumer in the initialization stage of an application container like IIS or in a Windows Service (which will ensure that it is long-running), the other is to use reset events or some other synchronization primitive if your application is a console executable.

What type of application are you developing?

Luke

Howard Hee

unread,
Oct 22, 2019, 12:18:25 AM10/22/19
to rabbitmq-users
HI Luke, I am facing same problem in window service. I have to add a while loop to keep the consumer alive. Below is my code:

using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string name = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: name, exchange: "delay_mq_exchange", routingKey: "delay_mq_routingKey");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);

// continue code
};

channel.BasicConsume(queue: name,
autoAck: true,
consumer: consumer);

//while (true)  //have to add while loop to make sure keep running
//{
//}

Yong Hua

unread,
Oct 22, 2019, 6:04:18 AM10/22/19
to rabbitm...@googlegroups.com
Hello

on 2019/10/22 12:18, Howard Hee wrote:
> HI Luke, I am facing same problem in window service. I have to add a
> while loop to keep the consumer alive.

why use a loop to keep the consumer alive? you would catch the exception
and restart the consumer if possible.

regards.

Howard Hee

unread,
Oct 22, 2019, 6:30:55 AM10/22/19
to rabbitmq-users
That's because the consumer unable receive the message i was sent, only put the for loop to keep alive then only can receive the message

Luke Bakken

unread,
Oct 22, 2019, 11:58:45 AM10/22/19
to rabbitmq-users
Hi Howard,

When BasicConsume() returns, you're going to exit two using statements. Take a second to read the documentation for using

https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/using-statement

In your case, both the channel and connection will be disposed.

Thanks,
Luke

Howard Hee

unread,
Oct 22, 2019, 12:05:11 PM10/22/19
to rabbitm...@googlegroups.com
How do I remain alive with exit using? The way I can thought is using a infinity loop to keep alive, as my code shown above.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/5d3aca3b-6ab3-4721-91ca-0b5d8a56fc41%40googlegroups.com.

Luke Bakken

unread,
Oct 22, 2019, 1:13:57 PM10/22/19
to rabbitmq-users
Hi Howard,

Don't use the using statement. Instead, save the connection and channel objects in variables and ensure they are cleaned up when your windows service exits.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.

Howard Hee

unread,
Oct 22, 2019, 10:56:33 PM10/22/19
to rabbitmq-users
Hi Luke,

It's worked after I changed to variable instead. Really thanks for the guide.
Hi Howard,

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages