.net client error (seems like a concurrency issue)

521 views
Skip to first unread message

Travis Wilkins

unread,
Sep 13, 2018, 5:50:10 PM9/13/18
to rabbitmq-users
I need some help debugging an issue. I am testing out a simple RPC pattern with RabbitMQ's .net client that uses the direct reply-to queue for sending a response. Everything works fine when I use a single caller but now I am testing out multiple callers and getting an exception. In the setup I am calling making a call to the same remote procedure in two different threads. The two calls use separate channels but share the same connection. The application seems to throw an exception while waiting for a response.I get the following exception and cannot view an inner exception:

System.IO.FileLoadException: 'Could not load file or assembly 'Microsoft.Diagnostics.Tracing.EventSource, Version=1.1.28.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)'

Here is the stack trace

> RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.LogCloseError(string error, System.Exception ex)
  RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.ClosingLoop()
  RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
  mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
  mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
  mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)
  mscorlib.dll!System.Threading.ThreadHelper.ThreadStart()


Now, if I use breakpoints to see where it is going wrong, the calls work fine. Which leads me to think there is a concurrency problem. Also, if I reduce the payload size (the dummy byte arrays that are 999999 byte in length) the problem seems to go away as well. I am guessing the semi-large payload is just increasing the likelihood of hitting a concurrency issue.

Here is my code:

 static void Main(string[] args)
        {
            string exchange = "testExchange";
            try
            {
                using (RabbitProxy rp = new RabbitProxy())
                {
                    rp.Initialize();
                    Task<byte[]> t1 = Task.Run(() => rp.Rpc(exchange, new byte[999999]));
                    Task<byte[]> t2 = Task.Run(() => rp.Rpc(exchange, new byte[999999]));
                    Task.WaitAll(t1, t2);
                }
            }
            catch (Exception e)
            {

            }
        }

public class RabbitProxy : IDisposable
    {
        private class RpcReplyData
        {
            public SemaphoreSlim ReceivedSignal { get; set; }
            public byte[] Data { get; set; }
        }

        private class PublishMessage
        {
            public string Exchange { get; set; }
            public string RoutingKey { get; set; } = String.Empty;
            public byte[] Data { get; set; }
            public IBasicProperties Props { get; set; }
            public RpcReplyData Reply { get; set; }
            public Guid Id { get; set; }
        }

        private readonly TimeSpan _timeOut = new TimeSpan(0, 0, 30);
        private const string ReplyToQueue = "amq.rabbitmq.reply-to";
        private const string Server = "localhost";
        private readonly ConcurrentDictionary<Guid, RpcReplyData> _rpcReplies = new ConcurrentDictionary<Guid, RpcReplyData>();
        private readonly ConcurrentDictionary<int, IModel> _threadIdToChannel = new ConcurrentDictionary<int, IModel>();
        private readonly ConnectionFactory _factory = new ConnectionFactory {HostName = Server};
        private IConnection _connection;

        public async Task<byte[]> Rpc(string exchange, byte[] argument)
        {
            SemaphoreSlim receivedSignal = new SemaphoreSlim(0, 1);
            Guid id = Guid.NewGuid();
            RpcReplyData reply = new RpcReplyData {ReceivedSignal = receivedSignal};

            var channel = GetChannel(Thread.CurrentThread.ManagedThreadId);
            var props = channel.CreateBasicProperties();
            props.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
            props.CorrelationId =id.ToString();
            props.ReplyTo = ReplyToQueue;
            var message = new PublishMessage { Exchange = exchange, Data = argument, Props = props, Id = id, Reply = reply };

            channel.BasicPublish(message.Exchange, message.RoutingKey, message.Props, message.Data);
            _rpcReplies.TryAdd(message.Id, message.Reply);

            await receivedSignal.WaitAsync(_timeOut);
            return reply.Data;
        }

        private IModel GetChannel(int threadId)
        {
            IModel channel;
            _threadIdToChannel.TryGetValue(threadId, out channel);
            if (channel != null)
            {
                return channel;
            }

            channel = _connection.CreateModel();
            _threadIdToChannel.TryAdd(threadId, channel);
            var replyToConsumer = new EventingBasicConsumer(channel);
            replyToConsumer.Received += (model, ea) =>
            {
                Guid id;
                bool isGuid = Guid.TryParse(ea.BasicProperties.CorrelationId, out id);
                RpcReplyData reply;

                if (isGuid && _rpcReplies.TryRemove(id, out reply))
                {
                    reply.Data = ea.Body;
                    reply.ReceivedSignal.Release();
                }
            };

            channel.BasicConsume(ReplyToQueue, true, replyToConsumer);
            return channel;
        }

        public void Initialize()
        {
            Dispose();
            _connection = _factory.CreateConnection();
            var channel = GetChannel(Thread.CurrentThread.ManagedThreadId);
            channel.ExchangeDeclare("testExchange", "fanout");
        }

        public void Dispose()
        {
            _connection?.Abort();

            _connection?.Dispose();
            foreach (var channel in _threadIdToChannel.Values)
            {
                channel?.Dispose();
            }

            _threadIdToChannel.Clear();
        }
    }

Luke Bakken

unread,
Sep 13, 2018, 6:11:23 PM9/13/18
to rabbitmq-users
Hi Travis,

We'd be curious to know if anything interesting is logged by RabbitMQ. Also, what version of RabbitMQ and Erlang you are using.

Have you verified that the assembly Microsoft.Diagnostics.Tracing.EventSource is present? I suspect the .NET client is trying to log something but can't due to that missing assembly. Once there, you may be able to determine the real cause of the issue.

FWIW, your code seems fine at first glance.

Thanks,
Luke

Travis Wilkins

unread,
Sep 13, 2018, 6:33:40 PM9/13/18
to rabbitmq-users
Here are the relevant log entries.

2018-09-13 15:18:37.148 [info] <0.7898.0> accepting AMQP connection <0.7898.0> ([::1]:51402 -> [::1]:5672)
2018-09-13 15:18:37.194 [info] <0.7898.0> connection <0.7898.0> ([::1]:51402 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'
2018-09-13 15:18:37.241 [error] <0.7898.0> Error on AMQP connection <0.7898.0> ([::1]:51402 -> [::1]:5672, vhost: '/', user: 'guest', state: running), channel 2:
 operation none caused a connection exception frame_error: "type 3, first 16 octets = <<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>: {invalid_frame_end_marker,\n                                                                0}"
2018-09-13 15:18:40.254 [warning] <0.7898.0> closing AMQP connection <0.7898.0> ([::1]:51402 -> [::1]:5672):

I am using RabbitMq 3.7.7 with Erland 21.0.1.

I tried installing the nuget package for Microsoft.Diagnostics.Tracing.EventSource with the matching version number but the same exception still persists.

-Travis

Luke Bakken

unread,
Sep 13, 2018, 7:11:37 PM9/13/18
to rabbitmq-users
invalid_frame_end_marker is an indication of concurrency issues in client applications.


It may be helpful to verify connections and channels in RabbitMQ at known points in time. For instance, each of these lines of code should open a new channel, and there should only be one connection to RabbitMQ -

                    rp.Initialize();
                    Task<byte[]> t1 = Task.Run(() => rp.Rpc(exchange, new byte[999999]));
                    Task<byte[]> t2 = Task.Run(() => rp.Rpc(exchange, new byte[999999]));

If you see something different, that would be informative to know.

Finally, I had a thought ... what if Task.Run isn't guaranteed to create a new thread? Doing some quick searches ... it's not. You should print out what gets passed to GetChannel in threadId - I suspect you won't see three unique values.

Thanks,
Luke

Travis Wilkins

unread,
Sep 13, 2018, 7:43:02 PM9/13/18
to rabbitmq-users
Alright, I've got some more information.

There are indeed two threads running, and two channels are being created. I've printed out the thread ids and the channels ids and get distinct numbers for each. If it didn't create a new thread it would be running these synchronously and I probably wouldn't see an error.

But I thought the error was on the receiving end because it gets to the await line waiting for a response before it dies. But I realize that BasicPublish is not a blocking call just like BasicConsume is not blocking. If I put a print statement before I await the response, but put a sleep statement before that, I never reach the print statement. So there is something failing inside the BasicPublish() call. That would explain why adjusting the sent payload size can change whether or not I get this error.

-Travis

Luke Bakken

unread,
Sep 13, 2018, 7:56:42 PM9/13/18
to rabbitmq-users
I'm not sure what to say at this point. If you can provide code to reproduce this issue that would be the most expedient way of getting to the bottom of it. Failing that, a Wireshark trace on port 5672 might be helpful.

Do you have a way of logging messages via your RPC server to see what it receives and replies back to your client app?

You should also use the rabbitmqctl list_channels and rabbitmqctl list_connections commands to verify channels and connections server-side.

If BasicPublish were to fail an exception would be raised. Try adding a handler for the IConnection.CallbackException event.

Thanks,
Luke

Travis Wilkins

unread,
Sep 14, 2018, 12:36:42 PM9/14/18
to rabbitmq-users
The RPC server never gets hit in this situation (responds normally with a single channel) so I think this must be a bug in the .net client code. The client does shutdown the connection listing the frame error as the cause.

Thanks for your help with this issue.

-Travis

Luke Bakken

unread,
Sep 14, 2018, 1:34:03 PM9/14/18
to rabbitmq-users
Hi Travis,

There's still no concrete evidence of a bug in the .NET client. As I said before, if you can provide a working example to reproduce this, I can take a look at it (i.e. a repository that I can clone, build, and run either on Windows or .NET core on Linux).

Thanks,
Luke

Michael Klishin

unread,
Sep 14, 2018, 1:40:34 PM9/14/18
to rabbitm...@googlegroups.com
The error effectively means two frames were written to the socket concurrently. It's a fatal connection exception in the protocol and
after that connection is closed by RabbitMQ and cannot be used again.

As of [1] we are not aware of any obvious issues around synchronisation around frame/frame set writes and I am 100% sure this is not a frame serialisation issue.

I'd recommend reducing the example to the absolutely minimum (e.g. publishing only and not dynamically opening channels), then reintroducing more and more
features found in your code.

Or just use a pool of connections similar to how you pool channels. With a limited concurrency rate that's a pretty workable scenario, and by far the safest option.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Travis Wilkins

unread,
Sep 14, 2018, 2:14:54 PM9/14/18
to rabbitmq-users
I simplified my example to this and still get the error.

        static void Main(string[] arguments)
        {
            string exchange = "testExchange";
            var factory = new ConnectionFactory {HostName = "localhost"};
            using (var connection = factory.CreateConnection())
            using (var channel1 = connection.CreateModel())
            using (var channel2 = connection.CreateModel())
            {
                Task t1 = Task.Run(() => channel1.BasicPublish(exchange, "", null, new byte[9999999]));
                Task t2 = Task.Run(() => channel2.BasicPublish(exchange, "", null, new byte[9999999]));
                Task.WaitAll(t1, t2);
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Luke Bakken

unread,
Sep 14, 2018, 2:39:42 PM9/14/18
to rabbitmq-users
Hi Travis,

Thanks for that code. The code, as written works fine and also when I change it to publish to the default exchange (empty string) with test-queue as the routing key. I created the queue test-queue and see two messages in it.

I am using a build of the .NET client from the master branch, .NET Core 2.0 running on Windows 10 (within a VirtualBox VM), and RabbitMQ 3.7.7 / Erlang 21.0.9 running on Arch Linux.

What is different about your environment? How are you running your test code? What version of .NET and the RabbitMQ .NET client?

Thanks -
Luke

Michael Klishin

unread,
Sep 14, 2018, 2:41:17 PM9/14/18
to rabbitm...@googlegroups.com
Thanks, that's small enough for a test case.

Our team will get to investigate this next wee. In case someone is interested in digging further before this,
Wireshark [6] is the best too to see what's going on on the wire.

[1][2][3] is where commands (such as channel.open or channel.close or basic.publish) are transmitted. As of [4] that code has been reworked
to not require locking to reduce lock contention. As of [5] we discovered that for SslStreams that doesn't quite work as the docs
advertise so we had to re-introduce locking for TLS connections.

9.99 MB messages will be framed into multiple payload frames (each 128 kiB in size by default), I tried modifying a Hello, world tutorial
to use an even large payload and in the absence of concurrency there are no errors, so framing implementation is unlikely to be the root cause here.



To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Sep 14, 2018, 2:47:36 PM9/14/18
to rabbitm...@googlegroups.com
Note that we had a couple of non-trivial PRs (and one more pending) in .NET client master recently.
They may affect the write code path.

I used 5.0.x and 5.1.x versions and cannot reproduce on .NET Core 2.1.3 on MacOS.

Server logs for the example connection report no *connection* errors (I didn't have the exchange declared,
so there is a channel exception which is expected):

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Luke Bakken

unread,
Sep 14, 2018, 2:48:05 PM9/14/18
to rabbitmq-users
In addition, using version 5.1.0 from NuGet works fine as well.

At this point, we need to know your complete environment. Thanks!

Michael Klishin

unread,
Sep 14, 2018, 2:59:17 PM9/14/18
to rabbitm...@googlegroups.com
The following modification successfully completes for me:

So concurrency rate or number of messages published don't seem to make a difference.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Travis Wilkins

unread,
Sep 14, 2018, 4:15:04 PM9/14/18
to rabbitmq-users
Here is my setup:

It is all running on a single machine with Windows Server 2012 R2.
Using RabbitMQ version 3.7.7, Erlang 21.0.1

The client app is using the RabbitMQ library installed from Nuget version 5.1 and targeting the .net framework version 4.6.2.

I'll try and get some data from wireshark.

Michael - Apologies for the nonsense comment. I was frustrated that my issue was being summarily dismissed and felt my last avenue of support was being cutoff. Obviously that isn't the case so I am sorry.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Sep 14, 2018, 5:32:16 PM9/14/18
to rabbitm...@googlegroups.com
Apology accepted.

Is there a chance you can try .NET Core 2 on Windows to see if it makes a difference?

That's what our team uses for development of the client, usually there's no difference in behavior compared with
the classic .NET but as our recent experience shows, even various implementations of the same interface can have inconsistencies.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Travis Wilkins

unread,
Sep 14, 2018, 5:47:06 PM9/14/18
to rabbitmq-users
I just tried using .net Core 2.1 and 2.0 and still got the error.

Travis Wilkins

unread,
Sep 14, 2018, 6:01:37 PM9/14/18
to rabbitmq-users
I just tried moving the client application to another machine from the rabbit instance and in that case I cannot replicate the error. My best guess is the network constraint causes the code to run a bit more serially than when on the same machine and in that case avoids a concurrency problem.

Travis Wilkins

unread,
Sep 14, 2018, 6:28:36 PM9/14/18
to rabbitmq-users
Also if I use two connections each with it's own channel I do not get the error, even on the same box. Something about the two channels trying to use a resource in the same connection triggers the error.

Michael Klishin

unread,
Sep 14, 2018, 7:59:58 PM9/14/18
to rabbitm...@googlegroups.com
My tests were over localhost. Socket writes happen on the publishing host, so if frame data is interleaved
in incorrect ways it shouldn't matter if the other end of the connection is local.

The only difference would be TCP windowing and how quickly the buffers fill up. But I somewhat doubt it
spills through .NET stream abstractions.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Luke Bakken

unread,
Sep 17, 2018, 11:40:11 AM9/17/18
to rabbitmq-users
Hi Travis,

Just to clarify, you see this issue when you run your client application on the same server as RabbitMQ using localhost as the host name, but when you move your client application to a different Windows 2012 R2 server and connect to RabbitMQ over the network you don't see the issue? Both servers are using the exact same .NET framework version, correct?

If you run your client application on the same server as RabbitMQ but use that server's "real" IP address instead of localhost, do you see the issue?

Thanks,
Luke

Travis Wilkins

unread,
Sep 17, 2018, 5:07:30 PM9/17/18
to rabbitmq-users
Running client app on the same server using localhost or the actual server name both produce the issue. The server has the .net framework 4.6.2 installed. When I run the app locally on my laptop with windows 10 and the .net framework 4.7.1 and connect to the server I do not see the issue. On the same server I also tried .net Core 2.1 and still had the issue. I also have now tried using .net framework 4.7.1 on the server and it seems harder to replicate the issue with 4.7.1 but I can still get it to occur.

Luke Bakken

unread,
Sep 17, 2018, 8:12:28 PM9/17/18
to rabbitmq-users
Hi Travis,

Any chance you could clone the repo, build the project using the master branch, and use that DLL? Michael pointed out there have been some fixes that may resolve this issue.


Thanks!
Luke

Travis Wilkins

unread,
Sep 18, 2018, 12:11:40 PM9/18/18
to rabbitmq-users
If I use the latest version of master I cannot replicate the error anymore :) Anyone know when the next version is getting released?

Luke Bakken

unread,
Sep 18, 2018, 1:44:25 PM9/18/18
to rabbitmq-users
Hi Travis,

I'll mention this to the team. Maybe we can get a 5.2.0 version out soon.

Thanks again for being patient and helping us out! 90% of the battle is being able to reproduce something like you could.

Luke

Travis Wilkins

unread,
Sep 18, 2018, 3:27:09 PM9/18/18
to rabbitmq-users
Thanks for the help and being patient with me.

Karl Nilsson

unread,
Sep 19, 2018, 11:17:44 AM9/19/18
to rabbitm...@googlegroups.com
Hi,

Travis, could you try 5.1.1-pre.2 that we just released to NuGet? It has some of the changes included in master. If it works we can proceed and get a full patch release out pretty quickly.


Cheers
Karl

--

Travis Wilkins

unread,
Sep 19, 2018, 12:13:53 PM9/19/18
to rabbitmq-users
Hi Karl,

I can still reproduce the issue with the 5.1.1-pre2 release.

-Travis

Karl Nilsson

unread,
Sep 19, 2018, 12:17:29 PM9/19/18
to rabbitmq-users
Ok thanks for letting us know. Could you try the 6.0.0 pre 2 release as well?



--
Karl Nilsson
twitter: @kjnilsson

Travis Wilkins

unread,
Sep 19, 2018, 12:24:26 PM9/19/18
to rabbitmq-users
I cannot reproduce the error with the 6.0.0-pre2 release.

Michael Klishin

unread,
Sep 19, 2018, 12:28:53 PM9/19/18
to rabbitm...@googlegroups.com
So it's one of the changes in master that addresses it.

For better or worse the .NET client has been following a strict interpretation of semantic versioning 2.0
(too strict in my opinion but hey, what do I know about .NET), so we cannot cherry-pick at least 3 potentially
relevant PRs to 5.x.

We will proceed with releasing 5.1.1 since it contains fixes unrelated to this thread. Please stick to 6.0.0-pre2 for now.
There will be breaking API changes in 6.0 but nothing major, mostly things that should have never been public will be hidden away.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Reply all
Reply to author
Forward
0 new messages