Below I present and explain the code that I have and at the end I will leave the technologies and versions of packages/libraries that I am using.
The class that is presented below is in charge of making the connections to the RabbitMQ server and obtaining the channels, within the code I put comments that describe the methods and important sections.
///Class base that creates the connection and provides channels using this same connection.
public abstract class RabbitMqClient : IDisposable
{
protected readonly ConsumerOptions _consumerOptions;
private readonly ILogger<RabbitMqClient> _logger;
private IConnection _connection;
private readonly ConnectionFactory _connectionFactory;
private static SemaphoreSlim semaphore = new SemaphoreSlim(1);
//Constructor of the class that creates the connection factory
protected RabbitMqClient(ConsumerOptions Options, ILogger<RabbitMqClient> logger)
{
try
{
_logger = logger;
_consumerOptions = Options;
_connectionFactory = new ConnectionFactory()
{
HostName = _consumerOptions.Host,
UserName = _consumerOptions.User,
Password = _consumerOptions.Password,
Port = _consumerOptions.Port,
VirtualHost = _consumerOptions.VirtualHost,
AutomaticRecoveryEnabled = _consumerOptions.AutorecoveryConn,
ClientProvidedName = "Messages-Consumer",
RequestedHeartbeat = TimeSpan.FromSeconds(60)
};
ConnectConsumerToRabbitMq();
}
catch (Exception ex)
{
logger.LogCritical($"RabbitMQ Consumer init error,ex:{ex.Message},Rabbit Host => {_consumerOptions.Host},Rabbit VirtualHost => {_consumerOptions.VirtualHost},Rabbit Pass => {_consumerOptions.Password}");
}
}
//Method that creates the connection itself and that is called by the constructor of the class
private void ConnectConsumerToRabbitMq()
{
if (_connection == null || _connection.IsOpen == false)
{
_connection = _connectionFactory.CreateConnection();
}
}
///Method that creates a channel and returns it to the requester.
public IModel GetChannel()
{
semaphore.Wait();
IModel channel = _connection.CreateModel();
if (_consumerOptions.ExchangeName != "")
{
channel.ExchangeDeclare(exchange: _consumerOptions.ExchangeName, type: _consumerOptions.ExchangeType, durable: _consumerOptions.ExchangeDurable, autoDelete: false);
channel.QueueDeclare(queue: _consumerOptions.QueueName, durable: _consumerOptions.QueueDurable, exclusive: false, autoDelete: false);
channel.QueueBind(queue: _consumerOptions.QueueName, exchange: _consumerOptions.ExchangeName, routingKey: _consumerOptions.RoutingKey);
}
semaphore.Release();
return channel;
}
public void Dispose()
{
try
{
_connection?.Close();
_connection?.Dispose();
_connection = null;
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Cannot dispose RabbitMQ channel or connection");
}
}
}
The class presented below is in charge of creating the consumers (these are threads of the Task type) and that these in turn process the messages, this processing involves operations with databases that include updates, writing and reading, the processing of each message has an average duration of 300 milliseconds or less, it is worth mentioning that the same queue has an ingestion of 20 messages per second.
namespace LMConsumer.Core.Services
{
//Class that implements the base class shown above and that is in charge of receiving and processing the messages.
public class LMConsumer : RabbitMqConsumer
{
private int _MaxNumbersOfConsumers;
private List<Task> Consumers = new List<Task>();
private CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
private CancellationToken CancellationToken;
public LMConsumer(ConsumerOptions rabbitOptions,
ILogger<RabbitMqClient> loggerRabbitBase,
int MaxNumbersOfConsumers
) : base(rabbitOptions, consumerLogger, loggerRabbitBase)
{
_logger = receptorLoggger;
_MaxNumbersOfConsumers = MaxNumbersOfConsumers;
CancellationToken = CancellationTokenSource.Token;
}
public async Task Start()
{
try
{
for (short i = 0; i < _MaxNumbersOfConsumers; i++)
{
int consumernumber = i + 1;
IModel Channel = GetChannel();
/*
In this for loop, a number of tasks indicated by the application configuration are created, each of these tasks is a consumer, and for each of them a communication channel with rabbitmq is also created.
*/
Consumers.Add(Task.Factory.StartNew(() =>
ReceiveMessage(string.Format("Consumer #{0}", consumernumber), Channel),
CancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
}
_logger.LogInformation($"{DateTime.UtcNow}: Consumo de mensajes iniciado con éxito.");
}
catch (Exception Ex)
{
_logger.LogError($"{DateTime.UtcNow}: {Ex.ToString()}");
}
await Task.CompletedTask;
}
//This method is the one that is in charge of receiving the messages and processing them.
private void ReceiveMessage(string ConsumerName,IModel Channel)
{
_logger.LogInformation($"{DateTime.UtcNow}: {ConsumerName} iniciado.");
var Consumer = new EventingBasicConsumer(Channel);
Channel.BasicQos(prefetchSize: 0, prefetchCount: 500, global: false);
Consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
bool MsgProcesed = false;
bool MsgInvalid = false;
try
{
/*
Here, a series of operations are performed on the message, including reads, updates, and inserts into a database.
*/
MsgProcesed = true;
}
catch (Exception Ex)
{
MsgInvalid = true;
_logger.LogError($"{DateTime.UtcNow}: Error: {Ex.ToString()}");
}
if (CancellationTokenSource.IsCancellationRequested) throw new TaskCanceledException("Cancelado");
if (MsgProcesed || MsgInvalid) Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
Channel.BasicConsume(queue: _consumerOptions.QueueName,
autoAck: false, consumer: Consumer);
}
}
}
What I want to achieve is the best performance in terms of consumption and processing speed, but since only one consumer processes messages I can't achieve this goal.
Next I list the versions of the technologies and packages that I am using
- .Net 6 Worker Application (similar to a console application but with the goal of being a background service)
-RabbitMQ.Client nuget package, the official RabbitMq version 6.4.0
-RabbitMQ 3.10.7 server running Linux (ubuntu 22.04)
Thank you for reading this message and I hope you can support me or guide me towards the solution.