Hello!
I'm trying to implement a Single-Active-Consumer queue such that only one consumer can see messages at a time.
I'm running two instances of my application. The first instance consumes messages, and the second sits idle. When I examine the Management interface it shows that the first instance is "single active" and the second instance is "waiting".
However, when I shutdown the first instance, I notice that the second instance does not begin consuming messages. I further notice that the Consumers list in the Management interface now shows "No Consumers."
Is there some kind of reconnect that I need to do in order to get the second instance to take-up the slack when the first instance goes away?
Many thanks!
-Dave
Versions:
I'm running RabbitMQ 3.8.3/Erlang 21.3 on localhost using Docker.
I'm using RabbitMQ.Client 6.0.0 from Nuget.
Queue Setup:
I'm configuring the Queue using this:
using (var con = inputFactory.CreateConnection("GroupIndexer"))
{
using (var model = con.CreateModel())
{
logger.Info($"Asserting Input Exchange: {ExchangeName}");
model.ExchangeDeclarePassive(ExchangeName);
logger.Info($"Creating/Asserting Single-Active-Consumer Input Queue: {QueueName}");
var args = new Dictionary<string, object>();
args.Add("x-single-active-consumer", true);
var result = model.QueueDeclare(QueueName, true, false, false, args);
logger.Info($"Creating/Asserting InputQueue Binding for: {InputRoutingKey}");
model.QueueBind(QueueName, ExchangeName, InputRoutingKey, null);
success = true;
}
}
I'm connecting to the Queue using this:
using (var inputCon = inputFactory.CreateConnection("MyInputConnection"))
using (var outputCon = outputFactory.CreateConnection("MyOutputConnection"))
{
using(var inputChannel = inputCon.CreateModel())
using(var outputChannel = outputCon.CreateModel())
{
//Wire-up the consumer
string inputConsumerTag = "";
inputChannel.BasicQos(0, 100, false);
inputConsumer = new EventingBasicConsumer(inputChannel);
inputConsumer.Received += (ch, eventargs) =>
{
try
{
if(stopRunning)
inputChannel.BasicCancel(inputConsumerTag);
//DO SOME WORK
inputChannel.BasicAck(eventargs.DeliveryTag, false);
}
catch (Exception ex)
{
logger.Error($"Error Processing Input Message: {ex.ToString()}");
}
};
//Start the Consumer
logger.Info("Starting Consumer.");
inputConsumerTag = inputChannel.BasicConsume(inputQueueConfiguation.QueueName, false, inputConsumer);
//Loop until instructed to stop running
while (!stopRunning)
{
System.Threading.Thread.Sleep(1000);
}
//Hold down before closing the channel... Just in case.
System.Threading.Thread.Sleep(5000);
inputChannel.Close();
}
}