Hello Michael,
Here is a code snippet of a pattern I am evaluating. The parallel.Invoke can have N number of Tasks such that the decision as to how many threads should be created is delegated to the run time/thread pool. I would like to issue a blocking call service.LongTask(threadName) on each thread to take advantage of the auto-acknowledgements subsystem.
I also evaluated the pattern of having only one call to program.Consume while using a new (fire and forget) background thread from the thread pool to process the dequeued messages such as Task.Run(()=>service.LongTask(threadName)). In this case, if I am not mistaken, we would overwhelm the thread pool queues thus transferring the concern of queuing to another subsystem. In addition, if I follow this route, I would have to "ack" the messages prior to execution, thus losing RabbitMq's acking / re-queuing functionality.
I've done a few small scale performance tests and both scenarios seem to be similar. Before engaging in more serious testing, I would like to know if scenario 1 (Parallel.Invoke) is a reasonable approach based on my description. I am concerned about creating/using too many tasks (threads) which would be blocking (or listening via EventingBasicConsumer.Received) resources. I decided to ask this question here because in this conversation you brought about the topic of the I/O thread. My understanding is that while my "Listening" threads are not processing incoming messages from the queue, they are in sleep mode and the I/O thread (associated to the corresponding connection) is the only active resource handling the message deliveries pushed by the RabbitMq, thus waking these threads to start processing the incoming request. During sleep mode, the listening threads would not be blocking the thread pool thus allowing the latter to claim the resources for other purposes.
Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var rmqConnection = factory.CreateConnection();
Parallel.Invoke(
() => program.Consume(rmqConnection, "thread1"),
() => program.Consume(rmqConnection, "thread2"),
...
() => program.Consume(rmqConnection, "thread20"));
}
Consume(IConnection connection, string threadName)
{
IModel rmqChannel;
EventingBasicConsumer consumer;
rmqChannel = rmqConnection.CreateModel();
rmqChannel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
rmqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
consumer = new EventingBasicConsumer(rmqChannel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var service = new MyService();
service.LongTask(threadName);
};
rmqChannel.BasicConsume(queue: "hello", autoAck: true,consumer: consumer);
}
Thank you