Multiple queues of RabbitMQ, gives error SharedQueue closed, System.IO.IOException

386 views
Skip to first unread message

myroman

unread,
May 15, 2015, 9:10:54 AM5/15/15
to rabbitm...@googlegroups.com
We receive such error: 
`SharedQueue closed, System.IO.IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop(),541,Unexpected Exception`

Some background information.

We use .NET web and console applications which use RabbitMQ to exchange data between themselves. Web application constantly receives from a source a lot of financial data (very often, say, 30ms) called transactions, analyzes code of each and sends them to one of the 2 running instances of the console application to get them calculated. Transaction can fall under 1 of 2 categories. Between web app and each console application there is a separate queue depending on the code. Depending on the category transactions go into first or second queue.  Also there are some queues between these apps to provide time, give different commands to console application, such as initialization, resetting. Each instance of console application calculates incoming transactions, and returns results to web app, using another queue, “result”. Since there are running 2 console apps, there are 2 result queues. 
We use separate queues because the speed of incoming transactions is big and web app has multiple threads to handle it and send them to console application. We use Reactive Extensions to do that.

When we create a queue, we use class Messenger which contains logic for sending/receiving data (see code below). Each instance of Messenger uses separate channel and connection. These queues are supposed to be long-running (the whole day), both on publisher and receiver side. But after some time, about an hour or two, one of the queues breaks in the receiver side in call `consumer.Queue.Dequeue();` because as I guess, RabbitMQ server thinks that sender is dead, and timeout has passed. As you see, in the `GetReceiveSource` method we run it in a thread to be able to listen to different queues simultaneously. We tried to play with different timeout, using RequestedHeartbeatTimeout setting, e.g. 10 minutes, but it only reduced the time of running from a couple hours, to these 10 minutes. Settings AutomaticRecoveryEnabled also didn’t help.
Here (http://mikehadlow.blogspot.com/2013/09/rabbitmq-amqp-channel-best-practices.html) I read that we should use single connection per application and different channels, and channels should be not in the same thread. In my application, as you see in Messenger, there is a separate connection for each instance of the Messenger class, and all channels are created in a single thread.
I’m a beginner in RabbitMQ so I must have missed something. Should I use only a single long-running publishing queue and receiving queue to eliminate this issue? 

Could you give advice on how to deal with RabbitMQ is multithreaded application to avoid SharedQueueClosed error, should I use push all transactions, timestamp, command into a single queue, and calculator will read them off a this queue and treat accordingly?
I have a basic setup of Messenger copied from rabbitmq tutorials (no acks).

    public class Messenger : IMessenger
    {
        private readonly ILog log = LogManager.GetLogger<Messenger>();

        private readonly Subject<string> subject = new Subject<string>();

        private bool isDisposed;

        private ConnectionFactory factory;

        private IConnection connection;

        private IModel channel;

        public string RoutingKey { get; private set; }

        public void Initialize(string routingKey)
        {
            try
            {
                Initialize(new MessengerSettings(routingKey));
            }
            catch (Exception)
            {
                log.ErrorFormat("Error during initializing for queue {0}", routingKey);
                throw;
            }
        }

        public void Initialize(MessengerSettings settings)
        {
            RoutingKey = settings.RoutingKey;
            factory = new ConnectionFactory
            {
                HostName = "localhost"
            };

            connection = factory.CreateConnection();
            factory.AutomaticRecoveryEnabled = true;
            channel = connection.CreateModel();
            channel.QueueDeclare(RoutingKey, settings.RequireAck, false, false, null);
            log.DebugFormat("Init Messenger Key={0}, HashCode={1}", RoutingKey, GetHashCode());
        }

        public void Send(string msg)
        {
            try
            {
                if (isDisposed)
                {
                    throw new InvalidOperationException(string.Format("Messenger '{0}' was disposed: cannot send message {1}", RoutingKey, msg));
                }

                channel.BasicPublish(string.Empty, RoutingKey, null, Encoding.UTF8.GetBytes(msg));
            }
            catch (Exception exc)
            {
                var errMsg = string.Format("Failed to send to queue {0} string {1}", RoutingKey, msg);
                log.Error(errMsg, exc);
                log.Fatal(errMsg);
            }
        }

        public void Send(object dto, MessageType messageType)
        {
            var json = Jsonner.Serialize(dto, messageType);
            if (messageType == MessageType.AlgoSetup)
            {
                log.TraceFormat("ALGOSETUP: {0}", json);
            }
            Send(json);
        }

        public IObservable<string> GetReceiveSource()
        {
            if (string.IsNullOrEmpty(RoutingKey))
            {
                throw new InvalidOperationException("Cannot use it without RoutingKey");
            }

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(RoutingKey, true, consumer);

            ThreadPool.QueueUserWorkItem(_ =>
            {
                while (!isDisposed)
                {
                    try
                    {
                        var ea = consumer.Queue.Dequeue();

                        var message = Encoding.UTF8.GetString(ea.Body);

                        subject.OnNext(message);
                    }
                    catch (EndOfStreamException exc)
                    {
                        // The consumer was cancelled, the model closed, or the connection went away.
                        var details = string.Empty;
                        if (consumer.ShutdownReason != null)
                        {
                            // we called it, so silently stop receiving
                            if (consumer.ShutdownReason.ReplyCode == RabbitMqCodes.Disposed)
                            {
                                subject.OnNext(null);
                                break;
                            }
                            details = string.Format("{0},{1},{2}", consumer.ShutdownReason.Cause, consumer.ShutdownReason.ReplyCode, consumer.ShutdownReason.ReplyText);
                        }
                        Console.WriteLine("Receiver {0}: end of stream ({1}), {2}", RoutingKey, exc.Message, details);
                        log.ErrorFormat("Receiver {0}: end of stream ({1}), {2}", RoutingKey, exc.Message, details);
                        subject.OnNext(null);
                        break;
                    }
                }
            });

            return subject;
        }

        public void Dispose()
        {
            if (!isDisposed)
            {
                isDisposed = true;
                if (channel != null)
                {
                    channel.Abort(RabbitMqCodes.Disposed, RabbitMqMessages.Disposed);
                    channel.Dispose();
                    channel = null;
                }
                if (connection != null)
                {
                    connection.Abort(RabbitMqCodes.Disposed, RabbitMqMessages.Disposed);
                    connection.Dispose();
                    connection = null;
                }
                log.DebugFormat("Messenger 'Key={0},HashCode={1}' disposed", RoutingKey, GetHashCode());
            }
        }
    }

Michael Klishin

unread,
May 15, 2015, 9:15:21 AM5/15/15
to rabbitm...@googlegroups.com, myroman
 On 15 May 2015 at 16:10:56, myroman (roman.pav...@gmail.com) wrote:
> We receive such error:
> `SharedQueue closed, System.IO.IOException: Unable to read
> data from the transport connection: An existing connection
> was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException:
> An existing connection was forcibly closed by the remote host

See RabbitMQ log for clues about why RabbitMQ closed its connection.

On an unrelated note, I'd also recommend avoiding the queueing consumer:
it cannot be recovered by automatic
connection recovery and is fairly tricky to recover manually.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


Skrzypek, Jonathan

unread,
May 15, 2015, 9:19:29 AM5/15/15
to Michael Klishin, rabbitm...@googlegroups.com, myroman
What is the recommended way ?
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send an email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
May 15, 2015, 9:26:58 AM5/15/15
to rabbitm...@googlegroups.com, myroman, Skrzypek, Jonathan
On 15 May 2015 at 16:19:28, Skrzypek, Jonathan (jonathan...@gs.com) wrote:
> What is the recommended way ?

Simply subclass DefaultBasicConsumer.

The queueing consumer had a reason to exist before we have consumer
work pools in the .NET client. Without it, synchronous operations such as basic.cancel
or queue.declare would deadlock.  This is not the case in 3.5.x and there are no
real reasons to use queueing consumer, at least no reasons worth the effort
of manual recovery.

Roman Pavlushchenko

unread,
May 15, 2015, 9:30:58 AM5/15/15
to Michael Klishin, rabbitm...@googlegroups.com, Skrzypek, Jonathan
In 'simple' log there is:
=ERROR REPORT==== 14-May-2015::14:52:07 ===
closing AMQP connection <0.8152.1> ([::1]:60057 -> [::1]:5672):
{heartbeat_timeout,running}

In sasl log:

=PROGRESS REPORT==== 14-May-2015::16:09:33 ===
          supervisor: {<0.11440.1>,rabbit_connection_sup}
             started: [{pid,<0.11441.1>},
                       {name,helper_sup},
                       {mfargs,{rabbit_connection_helper_sup,start_link,[]}},
                       {restart_type,intrinsic},
                       {shutdown,infinity},
                       {child_type,supervisor}]

What do they tell?
Thanks

Michael Klishin

unread,
May 15, 2015, 9:34:15 AM5/15/15
to Roman Pavlushchenko, rabbitm...@googlegroups.com
On 15 May 2015 at 16:30:56, Roman Pavlushchenko (roman.pav...@gmail.com) wrote:
> In 'simple' log there is:
> =ERROR REPORT==== 14-May-2015::14:52:07 ===
> closing AMQP connection <0.8152.1> ([::1]:60057 -> [::1]:5672):
> {heartbeat_timeout,running}

That the client was considered dead by the server because it didn't send any
heartbeats in the expected time frame. 

Are you using a 3.5.x .NET client before 3.5.2? If so, either upgrade to 3.5.2 or
downgrade to 3.4.4 (no real reason to downgrade if you ask me): this is a known
bug with heartbeat implementation in the client:
https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/68

Roman Pavlushchenko

unread,
May 15, 2015, 9:41:00 AM5/15/15
to Michael Klishin, rabbitm...@googlegroups.com
Thank you.
We use 4.5.1. I read about differences  between DefaultBasicConsumer and QueueingBasicConsumer, so my question is why client didn't send any heartbeats? We use default 580 seconds timeout. Could it be because client got blocked? Or because we didn't send ACK? (from QueuingBasicConsumer: Note that messages taken from the SharedQueue may need acknowledging with IModel.BasicAck).

Roman

Michael Klishin

unread,
May 15, 2015, 9:44:35 AM5/15/15
to Roman Pavlushchenko, rabbitm...@googlegroups.com
On 15 May 2015 at 16:40:59, Roman Pavlushchenko (roman.pav...@gmail.com) wrote:
> my question is why client didn't send any heartbeats?

See my earlier response. It is a known bug in 3.5.0 and 3.5.1 in the .NET client.
There's a link to the GitHub issue as well. 

Michael Klishin

unread,
May 15, 2015, 9:45:18 AM5/15/15
to Roman Pavlushchenko, rabbitm...@googlegroups.com
On 15 May 2015 at 16:40:59, Roman Pavlushchenko (roman.pav...@gmail.com) wrote:
> Could it be because client got blocked?

No, when a resource-driven alarm is in effect, heartbeat monitoring is disabled
on the server end .
Reply all
Reply to author
Forward
0 new messages