I need some help debugging an issue. I am testing out a simple RPC pattern with RabbitMQ's .net client that uses the direct reply-to queue for sending a response. Everything works fine when I use a single caller but now I am testing out multiple callers and getting an exception. In the setup I am calling making a call to the same remote procedure in two different threads. The two calls use separate channels but share the same connection. The application seems to throw an exception while waiting for a response.I get the following exception and cannot view an inner exception:
System.IO.FileLoadException: 'Could not load file or assembly 'Microsoft.Diagnostics.Tracing.EventSource, Version=1.1.28.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a' or one of its dependencies. The located assembly's manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)'
> RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.LogCloseError(string error, System.Exception ex)
RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.ClosingLoop()
RabbitMQ.Client.dll!RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)
mscorlib.dll!System.Threading.ThreadHelper.ThreadStart()
Now, if I use breakpoints to see where it is going wrong, the calls work fine. Which leads me to think there is a concurrency problem. Also, if I reduce the payload size (the dummy byte arrays that are 999999 byte in length) the problem seems to go away as well. I am guessing the semi-large payload is just increasing the likelihood of hitting a concurrency issue.
public class RabbitProxy : IDisposable
{
private class RpcReplyData
{
public SemaphoreSlim ReceivedSignal { get; set; }
public byte[] Data { get; set; }
}
private class PublishMessage
{
public string Exchange { get; set; }
public string RoutingKey { get; set; } = String.Empty;
public byte[] Data { get; set; }
public IBasicProperties Props { get; set; }
public RpcReplyData Reply { get; set; }
public Guid Id { get; set; }
}
private readonly TimeSpan _timeOut = new TimeSpan(0, 0, 30);
private const string ReplyToQueue = "amq.rabbitmq.reply-to";
private const string Server = "localhost";
private readonly ConcurrentDictionary<Guid, RpcReplyData> _rpcReplies = new ConcurrentDictionary<Guid, RpcReplyData>();
private readonly ConcurrentDictionary<int, IModel> _threadIdToChannel = new ConcurrentDictionary<int, IModel>();
private readonly ConnectionFactory _factory = new ConnectionFactory {HostName = Server};
private IConnection _connection;
public async Task<byte[]> Rpc(string exchange, byte[] argument)
{
SemaphoreSlim receivedSignal = new SemaphoreSlim(0, 1);
Guid id = Guid.NewGuid();
RpcReplyData reply = new RpcReplyData {ReceivedSignal = receivedSignal};
var channel = GetChannel(Thread.CurrentThread.ManagedThreadId);
var props = channel.CreateBasicProperties();
props.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
props.CorrelationId =id.ToString();
props.ReplyTo = ReplyToQueue;
var message = new PublishMessage { Exchange = exchange, Data = argument, Props = props, Id = id, Reply = reply };
channel.BasicPublish(message.Exchange, message.RoutingKey, message.Props, message.Data);
_rpcReplies.TryAdd(message.Id, message.Reply);
await receivedSignal.WaitAsync(_timeOut);
return reply.Data;
}
private IModel GetChannel(int threadId)
{
IModel channel;
_threadIdToChannel.TryGetValue(threadId, out channel);
if (channel != null)
{
return channel;
}
channel = _connection.CreateModel();
_threadIdToChannel.TryAdd(threadId, channel);
var replyToConsumer = new EventingBasicConsumer(channel);
replyToConsumer.Received += (model, ea) =>
{
Guid id;
bool isGuid = Guid.TryParse(ea.BasicProperties.CorrelationId, out id);
RpcReplyData reply;
if (isGuid && _rpcReplies.TryRemove(id, out reply))
{
reply.Data = ea.Body;
reply.ReceivedSignal.Release();
}
};
channel.BasicConsume(ReplyToQueue, true, replyToConsumer);
return channel;
}
public void Initialize()
{
Dispose();
_connection = _factory.CreateConnection();
var channel = GetChannel(Thread.CurrentThread.ManagedThreadId);
channel.ExchangeDeclare("testExchange", "fanout");
}
public void Dispose()
{
_connection?.Abort();
_connection?.Dispose();
foreach (var channel in _threadIdToChannel.Values)
{
channel?.Dispose();
}
_threadIdToChannel.Clear();
}
}