Similar to other question here, we get unexpected connection closures during the course of a day.
Here are the information I have:
- Client Version: <PackageReference Include="RabbitMQ.Client" Version="6.2.4" />
- Server Version: RabbitMQ: v3.10.5, Erlang: 24.3.4.1, Linux: Ubuntu 20.04
- Architecture / Topology:
- We have a single self hosted AWS RabbitMQ Instance, AWS LoadBalancer and a minimum of 8 servers connecting. Each server has two RabbitMQ Connections (Producer and Consumer) with many Producers and Consumers using the two connections. Most of the channels are long running but some only live for 'n brief period each day
Common log entries on the server, they come in pairs for each running server at that moment and it seems it is all the running servers
client unexpectedly closed TCP connection
Then the client seem to get the End of Stream errors:
Failed to publish 2 - Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.We have now changed the following:- Removed the load balancers and connect to the RabbitMQ server directly
- Changed the connection settings to: NetworkRecoveryInterval = TimeSpan.FromSeconds(1);// Reduced from 5 to 1
- Added code to the publish logic that will try and recreated a model/channel (See below)
Questions:
- Do we need to manually also cater for disconnect/connect
- If so, is the code snippet below correct/good practice
- If not a good approach, Would it be better to queue messages in memory and then publish them when a reconnected event occurs (I don't see ANY log entries for _model.BasicRecoverOk)?
Thanks in advance
// Connection
new ConnectionFactory
{
HostName = hostName,
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
RequestedHeartbeat = TimeSpan.FromSeconds(30),//<-- Defaults --> //TODO From Settings
NetworkRecoveryInterval = TimeSpan.FromSeconds(1)//<-- Defaults --> //TODO From Settings
};
// Publish logic snippet
lock (_locker)
{
try
{
..........
_verifyAndRetryOrThrowChannelConnection(_model);
// Ensure that the channel is not used simultaneously by different threads
_model.BasicPublish(
----------------------------------
// New wait/create
private void _verifyAndRetryOrThrowChannelConnection(IModel model)
{
if (_model.IsOpen)
return;
_log.Warn($"Manually reconnecting the publisher model. {_getLogLine()}");
if (_disposed)
throw new InvalidOperationException($"Attempting to publish on a disposed instance: {_getLogLine()}");
var maxAttempts = 11; //TODO: Move to settings
var attemptCount = 0;
// NetworkRecoveryInterval is 1s
// Wait for just more than two seconds before creating a new channel
while (attemptCount <= maxAttempts)
{
if (_model.IsOpen)
break; // Auto recovery worked
if (_connectionsProvider.IsProducerConnected && attemptCount > 6)
break;// Connection recovered by channel NOT
attemptCount++;
new ManualResetEvent(false).WaitOne(200);
}
if (_model.IsOpen)
return; // Auto recovery worked
_log.Warn($"Manually creating the publisher model. {_getLogLine()}");
try
{
_model.CallbackException -= _model_CallbackException;
_model.BasicRecoverOk -= _model_BasicRecoverOk;
_model.ModelShutdown -= _model_ModelShutdown;
if (_confirmPublish)
{
_model.BasicAcks -= _model_BasicAcks;
_model.BasicNacks -= _model_BasicNacks;
}
}
catch (Exception e)
{
_log.Error($"Attempted to un register publisher model events. {e.Message}",e);
}
_createModel(_connectionsProvider.GetProducerConnectionProvider, _confirmPublish);
}