Single Active Consumer Failing-Over As Expected With EventingBasicConsumer(.Net Client)

130 views
Skip to first unread message

Dave Seltzer

unread,
May 19, 2020, 3:55:23 PM5/19/20
to rabbitmq-users
Hello!

I'm trying to implement a Single-Active-Consumer queue such that only one consumer can see messages at a time.

I'm running two instances of my application. The first instance consumes messages, and the second sits idle. When I examine the Management interface it shows that the first instance is "single active" and the second instance is "waiting".

However, when I shutdown the first instance, I notice that the second instance does not begin consuming messages. I further notice that the Consumers list in the Management interface now shows "No Consumers."

Is there some kind of reconnect that I need to do in order to get the second instance to take-up the slack when the first instance goes away?

Many thanks!

-Dave

Versions:
I'm running RabbitMQ 3.8.3/Erlang 21.3 on localhost using Docker. 
I'm using RabbitMQ.Client 6.0.0 from Nuget.

Queue Setup:
I'm configuring the Queue using this:
using (var con = inputFactory.CreateConnection("GroupIndexer"))
{


   
using (var model = con.CreateModel())
   
{
        logger
.Info($"Asserting Input Exchange: {ExchangeName}");
        model
.ExchangeDeclarePassive(ExchangeName);


        logger
.Info($"Creating/Asserting Single-Active-Consumer Input Queue: {QueueName}");
       
var args = new Dictionary<string, object>();
        args
.Add("x-single-active-consumer", true);
       
var result = model.QueueDeclare(QueueName, true, false, false, args);


        logger
.Info($"Creating/Asserting InputQueue Binding for: {InputRoutingKey}");
        model
.QueueBind(QueueName, ExchangeName, InputRoutingKey, null);


        success
= true;
   
}
}

I'm connecting to the Queue using this:
using (var inputCon = inputFactory.CreateConnection("MyInputConnection"))
using (var outputCon = outputFactory.CreateConnection("MyOutputConnection"))
{
   
using(var inputChannel = inputCon.CreateModel())
   
using(var outputChannel = outputCon.CreateModel())
   
{
       
//Wire-up the consumer
       
string inputConsumerTag = "";
        inputChannel
.BasicQos(0, 100, false);
        inputConsumer
= new EventingBasicConsumer(inputChannel);
        inputConsumer
.Received += (ch, eventargs) =>
       
{
           
try
           
{
               
if(stopRunning)
                    inputChannel
.BasicCancel(inputConsumerTag);


               
//DO SOME WORK
               
                inputChannel
.BasicAck(eventargs.DeliveryTag, false);
           
}
           
catch (Exception ex)
           
{
                logger
.Error($"Error Processing Input Message: {ex.ToString()}");
           
}                        
       
};
       
       
//Start the Consumer
        logger
.Info("Starting Consumer.");
        inputConsumerTag
= inputChannel.BasicConsume(inputQueueConfiguation.QueueName, false, inputConsumer);


       
//Loop until instructed to stop running
       
while (!stopRunning)
       
{
           
System.Threading.Thread.Sleep(1000);
       
}
       
       
//Hold down before closing the channel... Just in case.
       
System.Threading.Thread.Sleep(5000);
        inputChannel
.Close();                            
   
}
}



Dave Seltzer

unread,
May 19, 2020, 3:56:18 PM5/19/20
to rabbitmq-users
Title should read *Not Failing-Over As Expected 

Whoops.

Dave Seltzer

unread,
May 19, 2020, 4:11:30 PM5/19/20
to rabbitmq-users
Having tested this further it now seems to be working. Not sure why it wasn't before!
Reply all
Reply to author
Forward
0 new messages