I have 2 examples one using EventingBasicConsumer and the other using AsyncEventingBasicConsumer
When I run this the AsyncEventingBasicConsumer is slower to complete.
I would expect it to be faster as threads available to do work when IO going on.
Am I doing something wrong or is it a misunderstanding from me?
Using EventingBasicConsumer:
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true,
Password = "n",
Port = 5672,
RequestedConnectionTimeout = 3000,
RequestedHeartbeat = 10,
TopologyRecoveryEnabled = true,
UseBackgroundThreadsForIO = false,
UserName = "n",
VirtualHost = "/"
};
var _list = new List<string>() { "trdevmq01b.ccs.local", "trdevmq01b.ccs.local" };
using (var connection = factory.CreateConnection(_list, "test"))
using (var channel = connection.CreateModel())
{
// create queue if not already there
channel.QueueDeclare("testqueue", true, false, false, null);
// set up consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
// publish
var props = channel.CreateBasicProperties();
for (int i = 0; i < 5000; i++)
{
Thread.Sleep(1);
var msgBytes = Encoding.UTF8.GetBytes("Message " + i);
channel.BasicPublish("", "testqueue", props, msgBytes);
Console.WriteLine("Publish " + i);
}
Console.ReadLine();
}
}
private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var body = e.Body;
var content = Encoding.UTF8.GetString(body);
Thread.Sleep(1);
Console.WriteLine("Finished handling " + content);
}
}
Using AsyncEventingBasicConsumer:
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true,
Password = "n",
Port = 5672,
RequestedConnectionTimeout = 3000,
RequestedHeartbeat = 10,
TopologyRecoveryEnabled = true,
UseBackgroundThreadsForIO = false,
UserName = "n",
VirtualHost = "/"
};
var _list = new List<string>() { "trdevmq01b.ccs.local", "trdevmq01b.ccs.local" };
using (var connection = factory.CreateConnection(_list, "test"))
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
// set up consumer
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
// publish
var props = channel.CreateBasicProperties();
for (int i = 0; i < 5000; i++)
{
Thread.Sleep(1);
var msgBytes = Encoding.UTF8.GetBytes("Message " + i);
channel.BasicPublish("", "testqueue", props, msgBytes);
Console.WriteLine("Publish " + i);
}
Console.ReadLine();
}
}
private static async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var body = e.Body;
var content = Encoding.UTF8.GetString(body);
await Task.Delay(1);
Console.WriteLine("Finished handling " + content);
}
}