How to use EventingBasicConsumer and cancel listening?

6,041 views
Skip to first unread message

johndoe...@gmail.com

unread,
Nov 16, 2015, 2:36:58 PM11/16/15
to rabbitmq-users
Before I started using EventingBasicConsumer, I was using the QueueingBasicConsumer a CancellationToken in my while loop to allow support of cancellations. e.g.

    while (!token.IsCancellationRequested)


However, with EventingBasicConsumer, I'm not sure how to do that. Any ideas?


using(var channel = connection.CreateModel())
{
     ...
     var consumer = new EventingBasicConsumer(channel);
     consumer.Received += (model, ea) => 
     ...
     channel.BasicConsume(
     ...
}

Isn't the call to channel.BasicConsume() blocking?
My understanding is we are to call
    channel.BasicCancel

But where/how do I call it?

Do I have to expose the channel and have a different thread busy-wait for the cancellation? And handle my own channel closing?

Thank you!

Michael Klishin

unread,
Nov 16, 2015, 3:04:34 PM11/16/15
to rabbitm...@googlegroups.com, johndoe...@gmail.com
On 16 November 2015 at 22:37:02, johndoe...@gmail.com (johndoe...@gmail.com) wrote:
> However, with EventingBasicConsumer, I'm not sure how to do
> that. Any ideas?
>
> e.g. from Tutorial #2 -- https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
>
> using(var channel = connection.CreateModel()) {
> ...
> var consumer = new EventingBasicConsumer(channel); consumer.Received
> += (model, ea) =>
> ...
> channel.BasicConsume(
> ...
> }
>
>
> Isn't the call to channel.BasicConsume() blocking?
> My understanding is we are to call
> channel.BasicCancel
>
> But where/how do I call it?
>
> Do I have to expose the channel and have a different thread busy-wait
> for the cancellation? And handle my own channel closing?

You seem to be confusing two things about cancellation. Your consumer can be cancelled
by your app using IModel.BasicCancel. You can call it any time you need to cancel a consumer. Consumers
are identified by consumer tags, which IModel.BasicConsume returns.

Your consumer can also be cancelled by the server when its underlying queue is deleted/expired.
To handled those events, use the DefaultBasicConsumer.ConsumerCancelled event. Since eventing
consumer subclasses default, it has the same event available.

I’m not sure I understand how BasicCancel’s blocking nature (it’s a part of the protocol, it expects a response)
matters. All consumer methods are dispatched to a thread pool as of .NET client 3.5.0, so you can
invoke blocking operations from consumer callbacks. This is exactly what QueueingBasicConsumer allowed
you to do in earlier version and why it is no longer has any reason to exist (other than “there’s code out there that
uses it”). 
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


johndoe...@gmail.com

unread,
Nov 16, 2015, 3:50:27 PM11/16/15
to rabbitmq-users
I agree that I am confused :)

Here's my use-case:
- Our message consumer is implemented as a Windows Service written in .Net.
- When the application receives a windows service stop/recycle event, it sets the 

    _tokenSource = new CancellationTokenSource();

    _tokenSource.Cancel();

- With the old QueuingBasicConsumer code,
  this allowed us to finish processing our current message and then exit the while-loop.
  i.e. a graceful exit

Maybe we don't need this cancellation token anymore? Is there a way I can help ensure that the current "consumer.Receive" method completes execution in order to send the acknowledgement?

But back to my original question - based on the RabbitMQ tutorial, the structure of our program looks like this:

public void StartConsumer(string queue, CancellationToken token)
{
   
using (channel = new channel())
   
{
     
      consumer
= new Consumer()
      consumer
.Received += receiveMethod;
      channel
.BasicConsume() // blocking
   
}
}


With the above code, there channel/consumer is hidden away - there's no way to call .BasicCancel()

Instead, would I have to do something like this? (pseudo code)
public void StartConsumer(string queue, CancellationToken token)
{
   channel
= new channel()
   consumer
= new Consumer()
   consumer
.Received += receiveMethod;

   
// launch new thread
   task
.StartNew() ==> channel.BasicConsume()  // blocks in the new thread (?)

   
// main thread busy-wait
   
while (!token.IsCancelletionRequested)
   
{
     
Thread.Delay(1000); // or Thread.Sleep()
      channel
.BasicCancel(consumer.Tag)
      channel
.Dispose(); // close, etc.
   
}
}


aside: Michael, your responses to this forum have been excellent and our team greatly appreciates it! Thank you!

Michael Klishin

unread,
Nov 16, 2015, 4:08:51 PM11/16/15
to rabbitm...@googlegroups.com
The client does not have a grace period for running consumer operations, so you need
to wait one way or another.

Java client has more control over the thread pool used, so it can wait for up to N seconds when a channel is closed.

Mani Gandham

unread,
Mar 26, 2016, 6:33:34 PM3/26/16
to rabbitmq-users
BasicConsume and BasicCancel just assign and remove your consumer (in this case it's the EventingBasicConsumer) from the IModel channel.

Once you assign a consumer, the channel will start pushing messages as fast as possible to the consumer. You can change the prefetch and velocity settings with the QOS config but unless you slow it down on purpose, it'll be very fast in delivering messages. With the EventingBasicConsumer, these messages are dispatched in order from the queue but the events are handled by whatever threads are free in the CLR threadpool. That means that multiple messages will be concurrently handled by different threads.

When you do a BasicCancel, you're just removing the consumer from getting any more messages *at that point* - however all the messages that are still on the wire and prefetched will still be dispatched to your EventingBasicConsumer.

So if you have a queue with 1000 items and do a BasicConsume followed by a BasicCancel, the EventingBasicConsumer might have gone 400 messages but still have 200 messages left to process and will continue to go through those.

A simple way to wait until all the messages are done is to use Interlocked.Increment/Decrement in the consumer code to atomically keep a counter of running events and wait till it hits 0.

Bhavesh Valand

unread,
Feb 6, 2017, 11:48:16 AM2/6/17
to rabbitmq-users, mani.g...@gmail.com
Any pseudo code with full example would appreciated.

Thanks
Bhavesh
Reply all
Reply to author
Forward
0 new messages