I have been scratching my head with this for a while and have managed to create a test case for it.
When using BasicGet across multiple tasks the result.Body which has a type of ReadOnlyMemory<byte> seems to bleed between threads which results in some of the following values within the test case:
These seem to collide with previous values and the length seems to be incorrect based on previous messages length. I was previously using a consumer successfully with no issue but due to constraints on the data coming in needing to be handled one message at a time I hit this issue when moving over to use BasicGet. I have tried the following example with threads opposed to tasks and fiddled relentlessly with the same result. Open to any suggestions!
Sorry for the wall of code but I thought showing the test case might help point out if I am doing something completely stupid.
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitMQ_Test
{
public class Container
{
public string Message { get; set; }
}
class Program
{
private static Random random = new Random();
public static string RandomString(int length)
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
return new string(Enumerable.Repeat(chars, length)
.Select(s => s[random.Next(s.Length)]).ToArray());
}
static void Main(string[] args)
{
string[] queues = new string[] {"queue1", "queue2", "queue3", "queue4" };
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
// Setup the publishers
foreach (var queue in queues)
{
var task = new Task(async () =>
{
var channel = connection.CreateModel();
channel.BasicQos(0, 1, false);
channel.QueueDeclare(queue: queue, exclusive: false, durable: true, autoDelete: false);
while(true)
{
var message = RandomString(random.Next(10, 30));
channel.BasicPublish("", queue, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Container() { Message = message })));
await Task.Delay(50);
}
});
task.Start();
}
//Setup recievers
foreach (var queue in queues)
{
var task = new Task(() =>
{
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: queue, exclusive: false, durable: true, autoDelete: false);
while (true)
{
BasicGetResult result = channel.BasicGet(queue, false);
if (result != null)
{
var body = result.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
var container = JsonConvert.DeserializeObject<Container>(message);
}
}
});
task.Start();
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}