BasicGet not functioning correctly within threads

239 views
Skip to first unread message

Henry Garle

unread,
Apr 21, 2021, 9:20:22 PM4/21/21
to rabbitmq-users
I have been scratching my head with this for a while and have managed to create a test case for it.
When using BasicGet across multiple tasks the result.Body which has a type of ReadOnlyMemory<byte> seems to bleed between threads which results in some of the following values within the test case:

{"Message":"AK8OUAUUJG"}�BES"}
{"Message":"45FH5KMCEXL5
{"Message":"SHGASRIZHJ6R"}�M6"}

These seem to collide with previous values and the length seems to be incorrect based on previous messages length. I was previously using a consumer successfully with no issue but due to constraints on the data coming in needing to be handled one message at a time I hit this issue when moving over to use BasicGet. I have tried the following example with threads opposed to tasks and fiddled relentlessly with the same result. Open to any suggestions!

Sorry for the wall of code but I thought showing the test case might help point out if I am doing something completely stupid.

using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ_Test
{
    public class Container
    {
        public string Message { get; set; }
    }

    class Program
    {
        private static Random random = new Random();
        public static string RandomString(int length)
        {
            const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
            return new string(Enumerable.Repeat(chars, length)
              .Select(s => s[random.Next(s.Length)]).ToArray());
        }

        static void Main(string[] args)
        {

            string[] queues = new string[] {"queue1", "queue2", "queue3", "queue4" };

            var factory = new ConnectionFactory() { HostName = "localhost" };
            var connection = factory.CreateConnection();

            // Setup the publishers
            foreach (var queue in queues)
            {
                var task = new Task(async () =>
                {
                    var channel = connection.CreateModel();

                    channel.BasicQos(0, 1, false);
                    channel.QueueDeclare(queue: queue, exclusive: false, durable: true, autoDelete: false);

                    while(true)
                    {
                        var message = RandomString(random.Next(10, 30));
                        channel.BasicPublish("", queue, body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Container() { Message = message })));
                        await Task.Delay(50);
                    }
                    

                });
                task.Start();
            }

            //Setup recievers
            foreach (var queue in queues)
            {
                var task = new Task(() =>
                {
                    var connection = factory.CreateConnection();
                    var channel = connection.CreateModel();

                    channel.QueueDeclare(queue: queue, exclusive: false, durable: true, autoDelete: false);

                    while (true)
                    {
                        BasicGetResult result = channel.BasicGet(queue, false);
                        if (result != null)
                        {
                            var body = result.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(message);
                            var container = JsonConvert.DeserializeObject<Container>(message);

                        }
                    }


                });
                task.Start();
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}




X Guest

unread,
Apr 21, 2021, 10:12:10 PM4/21/21
to rabbitmq users
It seems the thread safety issue. Please google with keywords "c# rabbitmq client thread safe" and you will see a lot of discussions.

B. Regards
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.


Henry Garle

unread,
Apr 22, 2021, 4:32:01 AM4/22/21
to rabbitm...@googlegroups.com
Thanks for the reply. I did try to find an answer by looking around and the only real information I could find is that channels should not be shared across threads which in my example they are not, every thread has their own channel so i am at a bit of a loss as to what the to is.

You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/oWBbmxVWqgI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/83488f10-bd6c-48ee-904f-7dd8c32e54d1%40www.fastmail.com.

Henry Garle

unread,
Apr 22, 2021, 10:34:59 AM4/22/21
to rabbitm...@googlegroups.com
If anyone finds this, there is a resolved issue that will be released with 6.2.2.

Reply all
Reply to author
Forward
0 new messages