Only one consumer processes messages in a multi-consumer scheme .net 6 c#

635 views
Skip to first unread message

Jonathan Rivera Diaz

unread,
Jan 13, 2023, 1:45:53 AM1/13/23
to rabbitmq-users
Hello everyone, I am trying to solve a problem and I have not found the solution, I hope you can help me.
The problem is that I have a message queue that receives a lot of messages per second, this message queue is consumed using multiple workers, the conflict is that only one consumer processes messages and all other consumers are in idle mode. Below I put an image that shows the problem.

RabbitMQIdleProblema.jpg
Below I present and explain the code that I have and at the end I will leave the technologies and versions of packages/libraries that I am using.

The class that is presented below is in charge of making the connections to the RabbitMQ server and obtaining the channels, within the code I put comments that describe the methods and important sections.

///Class base that creates the connection and provides channels using this same connection.

public abstract class RabbitMqClient : IDisposable
    {
       
        protected readonly ConsumerOptions _consumerOptions;
        private readonly ILogger<RabbitMqClient> _logger;
        private IConnection _connection;
        private readonly ConnectionFactory _connectionFactory;
        private static SemaphoreSlim semaphore = new SemaphoreSlim(1);


        //Constructor of the class that creates the connection factory
        protected RabbitMqClient(ConsumerOptions Options, ILogger<RabbitMqClient> logger)
        {


            try
            {
                _logger = logger;
                _consumerOptions = Options;
                _connectionFactory = new ConnectionFactory()
                {
                    HostName = _consumerOptions.Host,
                    UserName = _consumerOptions.User,
                    Password = _consumerOptions.Password,
                    Port = _consumerOptions.Port,
                    VirtualHost = _consumerOptions.VirtualHost,
                    AutomaticRecoveryEnabled = _consumerOptions.AutorecoveryConn,
                    ClientProvidedName = "Messages-Consumer",
                    RequestedHeartbeat = TimeSpan.FromSeconds(60)
                };
                ConnectConsumerToRabbitMq();
            }
            catch (Exception ex)
            {
                logger.LogCritical($"RabbitMQ Consumer init error,ex:{ex.Message},Rabbit Host => {_consumerOptions.Host},Rabbit VirtualHost => {_consumerOptions.VirtualHost},Rabbit Pass => {_consumerOptions.Password}");


            }

        }


        //Method that creates the connection itself and that is called by the constructor of the class
        private void ConnectConsumerToRabbitMq()
        {
            if (_connection == null || _connection.IsOpen == false)
            {
                _connection = _connectionFactory.CreateConnection();
            }
        }

        ///Method that creates a channel and returns it to the requester.
        public IModel GetChannel()
        {
            semaphore.Wait();

            IModel channel = _connection.CreateModel();

            if (_consumerOptions.ExchangeName != "")
            {
                channel.ExchangeDeclare(exchange: _consumerOptions.ExchangeName, type: _consumerOptions.ExchangeType, durable: _consumerOptions.ExchangeDurable, autoDelete: false);
                channel.QueueDeclare(queue: _consumerOptions.QueueName, durable: _consumerOptions.QueueDurable, exclusive: false, autoDelete: false);
                channel.QueueBind(queue: _consumerOptions.QueueName, exchange: _consumerOptions.ExchangeName, routingKey: _consumerOptions.RoutingKey);
            }

            semaphore.Release();

            return channel;

        }

        public void Dispose()
        {
            try
            {

                _connection?.Close();
                _connection?.Dispose();
                _connection = null;
            }
            catch (Exception ex)
            {
                _logger.LogCritical(ex, "Cannot dispose RabbitMQ channel or connection");
            }
        }
    }

The class presented below is in charge of creating the consumers (these are threads of the Task type) and that these in turn process the messages, this processing involves operations with databases that include updates, writing and reading, the processing of each message has an average duration of 300 milliseconds or less, it is worth mentioning that the same queue has an ingestion of 20 messages per second.

namespace LMConsumer.Core.Services
{

   
    //Class that implements the base class shown above and that is in charge of receiving and processing the messages.        
    public class LMConsumer : RabbitMqConsumer
    {

       
        private int _MaxNumbersOfConsumers;
        private List<Task> Consumers = new List<Task>();
        private CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
        private CancellationToken CancellationToken;          
       


        public LMConsumer(ConsumerOptions rabbitOptions,                                                    
                                ILogger<RabbitMqClient> loggerRabbitBase,                                                                
                                int MaxNumbersOfConsumers                    
                                ) : base(rabbitOptions, consumerLogger, loggerRabbitBase)
        {
            _logger = receptorLoggger;
            _MaxNumbersOfConsumers = MaxNumbersOfConsumers;
            CancellationToken = CancellationTokenSource.Token;                            


        }

        public async Task Start()
        {
            try
            {
               


                for (short i = 0; i < _MaxNumbersOfConsumers; i++)
                {
                    int consumernumber = i + 1;

                    IModel Channel = GetChannel();

                    /*
                    In this for loop, a number of tasks indicated by the application configuration are created, each of these tasks is a consumer, and for each of them a communication channel with rabbitmq is also created.
                   
                    */

                    Consumers.Add(Task.Factory.StartNew(() =>
                                                            ReceiveMessage(string.Format("Consumer #{0}", consumernumber), Channel),
                                                            CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));

               
                }              



                _logger.LogInformation($"{DateTime.UtcNow}: Consumo de mensajes iniciado con éxito.");
            }
            catch (Exception Ex)
            {
                _logger.LogError($"{DateTime.UtcNow}: {Ex.ToString()}");
            }

            await Task.CompletedTask;
        }


        //This method is the one that is in charge of receiving the messages and processing them.
        private void ReceiveMessage(string ConsumerName,IModel Channel)
        {
            _logger.LogInformation($"{DateTime.UtcNow}: {ConsumerName} iniciado.");


            var Consumer = new EventingBasicConsumer(Channel);

           
            Channel.BasicQos(prefetchSize: 0, prefetchCount: 500, global: false);


            Consumer.Received += (sender, ea) =>
            {


                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                bool MsgProcesed = false;

                bool MsgInvalid = false;

                try
                {
                   
                    /*
                   
                    Here, a series of operations are performed on the message, including reads, updates, and inserts into a database.
                    */

                        MsgProcesed = true;
       


                }
                catch (Exception Ex)
                {
                    MsgInvalid = true;
                    _logger.LogError($"{DateTime.UtcNow}: Error: {Ex.ToString()}");
                }
                if (CancellationTokenSource.IsCancellationRequested) throw new TaskCanceledException("Cancelado");
                if (MsgProcesed || MsgInvalid) Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
           
            };

            Channel.BasicConsume(queue: _consumerOptions.QueueName,
                                    autoAck: false, consumer: Consumer);
        }




    }
}

What I want to achieve is the best performance in terms of consumption and processing speed, but since only one consumer processes messages I can't achieve this goal.

Next I list the versions of the technologies and packages that I am using

- .Net 6 Worker Application (similar to a console application but with the goal of being a background service)
-RabbitMQ.Client nuget package, the official RabbitMq version 6.4.0
-RabbitMQ 3.10.7 server running Linux (ubuntu 22.04)

Thank you for reading this message and I hope you can support me or guide me towards the solution.

Luke Bakken

unread,
Jan 13, 2023, 10:26:29 AM1/13/23
to rabbitmq-users
Hello,

Thank you for the information. Right now you're asking me to copy your code, get it working, and see if I can reproduce your issue. I don't have the time to do that. What I do have the time for, however, is something like this:

cd dotnet-consumer-issue
dotnet build
dotnet run

... in other words, if you provide a git repository with a complete example that I can clone, build and run to observe this issue, I can assist you. Otherwise, you will have to wait much longer until I have the free time available to get your code working.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages