We receive such error:
`SharedQueue closed, System.IO.IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)
at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop(),541,Unexpected Exception`
Some background information.
We use .NET web and console applications which use RabbitMQ to exchange data between themselves. Web application constantly receives from a source a lot of financial data (very often, say, 30ms) called transactions, analyzes code of each and sends them to one of the 2 running instances of the console application to get them calculated. Transaction can fall under 1 of 2 categories. Between web app and each console application there is a separate queue depending on the code. Depending on the category transactions go into first or second queue. Also there are some queues between these apps to provide time, give different commands to console application, such as initialization, resetting. Each instance of console application calculates incoming transactions, and returns results to web app, using another queue, “result”. Since there are running 2 console apps, there are 2 result queues.
We use separate queues because the speed of incoming transactions is big and web app has multiple threads to handle it and send them to console application. We use Reactive Extensions to do that.
When we create a queue, we use class Messenger which contains logic for sending/receiving data (see code below). Each instance of Messenger uses separate channel and connection. These queues are supposed to be long-running (the whole day), both on publisher and receiver side. But after some time, about an hour or two, one of the queues breaks in the receiver side in call `consumer.Queue.Dequeue();` because as I guess, RabbitMQ server thinks that sender is dead, and timeout has passed. As you see, in the `GetReceiveSource` method we run it in a thread to be able to listen to different queues simultaneously. We tried to play with different timeout, using RequestedHeartbeatTimeout setting, e.g. 10 minutes, but it only reduced the time of running from a couple hours, to these 10 minutes. Settings AutomaticRecoveryEnabled also didn’t help.
I’m a beginner in RabbitMQ so I must have missed something. Should I use only a single long-running publishing queue and receiving queue to eliminate this issue?
Could you give advice on how to deal with RabbitMQ is multithreaded application to avoid SharedQueueClosed error, should I use push all transactions, timestamp, command into a single queue, and calculator will read them off a this queue and treat accordingly?
I have a basic setup of Messenger copied from rabbitmq tutorials (no acks).
public class Messenger : IMessenger
{
private readonly ILog log = LogManager.GetLogger<Messenger>();
private readonly Subject<string> subject = new Subject<string>();
private bool isDisposed;
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
public string RoutingKey { get; private set; }
public void Initialize(string routingKey)
{
try
{
Initialize(new MessengerSettings(routingKey));
}
catch (Exception)
{
log.ErrorFormat("Error during initializing for queue {0}", routingKey);
throw;
}
}
public void Initialize(MessengerSettings settings)
{
RoutingKey = settings.RoutingKey;
factory = new ConnectionFactory
{
HostName = "localhost"
};
connection = factory.CreateConnection();
factory.AutomaticRecoveryEnabled = true;
channel = connection.CreateModel();
channel.QueueDeclare(RoutingKey, settings.RequireAck, false, false, null);
log.DebugFormat("Init Messenger Key={0}, HashCode={1}", RoutingKey, GetHashCode());
}
public void Send(string msg)
{
try
{
if (isDisposed)
{
throw new InvalidOperationException(string.Format("Messenger '{0}' was disposed: cannot send message {1}", RoutingKey, msg));
}
channel.BasicPublish(string.Empty, RoutingKey, null, Encoding.UTF8.GetBytes(msg));
}
catch (Exception exc)
{
var errMsg = string.Format("Failed to send to queue {0} string {1}", RoutingKey, msg);
log.Error(errMsg, exc);
log.Fatal(errMsg);
}
}
public void Send(object dto, MessageType messageType)
{
var json = Jsonner.Serialize(dto, messageType);
if (messageType == MessageType.AlgoSetup)
{
log.TraceFormat("ALGOSETUP: {0}", json);
}
Send(json);
}
public IObservable<string> GetReceiveSource()
{
if (string.IsNullOrEmpty(RoutingKey))
{
throw new InvalidOperationException("Cannot use it without RoutingKey");
}
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(RoutingKey, true, consumer);
ThreadPool.QueueUserWorkItem(_ =>
{
while (!isDisposed)
{
try
{
var ea = consumer.Queue.Dequeue();
var message = Encoding.UTF8.GetString(ea.Body);
subject.OnNext(message);
}
catch (EndOfStreamException exc)
{
// The consumer was cancelled, the model closed, or the connection went away.
var details = string.Empty;
if (consumer.ShutdownReason != null)
{
// we called it, so silently stop receiving
if (consumer.ShutdownReason.ReplyCode == RabbitMqCodes.Disposed)
{
subject.OnNext(null);
break;
}
details = string.Format("{0},{1},{2}", consumer.ShutdownReason.Cause, consumer.ShutdownReason.ReplyCode, consumer.ShutdownReason.ReplyText);
}
Console.WriteLine("Receiver {0}: end of stream ({1}), {2}", RoutingKey, exc.Message, details);
log.ErrorFormat("Receiver {0}: end of stream ({1}), {2}", RoutingKey, exc.Message, details);
subject.OnNext(null);
break;
}
}
});
return subject;
}
public void Dispose()
{
if (!isDisposed)
{
isDisposed = true;
if (channel != null)
{
channel.Abort(RabbitMqCodes.Disposed, RabbitMqMessages.Disposed);
channel.Dispose();
channel = null;
}
if (connection != null)
{
connection.Abort(RabbitMqCodes.Disposed, RabbitMqMessages.Disposed);
connection.Dispose();
connection = null;
}
log.DebugFormat("Messenger 'Key={0},HashCode={1}' disposed", RoutingKey, GetHashCode());
}
}
}