TimeoutException from.Net Client on connection.createModel when reccreating channel after modelShutdown event

356 views
Skip to first unread message

Stefan Östman

unread,
Jun 11, 2020, 5:26:42 AM6/11/20
to rabbitmq-users
Hi I am trying to build a Consumer that can recover after ModelShutdown event.

using .Net client 6.0 on windows 10 computer

server is on windows 2016 server
version 3.8.3
Erlang 22.3

I have built a test client where i try to recover from most situations i can think of

The connection, channel and consumer is created correctly on the initialization
The consumer reads, and correctly confirm 4 messages, but on the fifth i force it to send a faulty deliveryTag to force a ModelShutDown event.

but...

when I in the event handler for ModelShutdown calls connection.createModel() i get a timeout exception and this is in the stackTrace
   at RabbitMQ.Util.BlockingCell`1.WaitForValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateNonRecoveringModel()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at MPA.RabbitMQ.Client.ReceiveLogsTopic.<>c__DisplayClass0_0.<Main>g__Initialize|4() in C:\C#Apps\RabbitMQ_Consumer\Consumer.cs:line 157

I have tried to understand what the .Net clientcode does but it is beyond me.
From what i understand it is somewhere waiting for some valu, but that is as far as i get.

I cannot see that i in any way can get round this in my code
Would have been so much easier if it was possible to just reopen the existing channel again.

Wih high hope that you can help me

Regards
Stefan 

This is my code.



EndPointResolver.cs
using RabbitMQ.Client;
using System.Collections.Generic;

namespace MPA.RabbitMQ.Client
{
    public class EndPointResolver : IEndpointResolver
    {
        public IEnumerable<AmqpTcpEndpoint> All()
        {
            return new List<AmqpTcpEndpoint>()
            {
                new AmqpTcpEndpoint()
                {
                    HostName = "UTVRABBIT02",
                    Port=5672
                },
                new AmqpTcpEndpoint()
                {
                    HostName = "UTVRABBIT01",
                    Port=5672
                }
            };
        }
    }
}


consumer.cs


using System.Collections.Generic;
using System.Text;
using System.Threading;
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;


namespace MPA.RabbitMQ.Client
{
    class ReceiveLogsTopic
    {
        
        public static void Main(string[] args)
        {
            EventingBasicConsumer consumer=null;
            IModel channel=null;
            IConnection connection=null;
            var factory = new ConnectionFactory() { 
                VirtualHost="/",
                UserName="USERNAME",
                Password="PASSWORD" 
                };
            factory.AutomaticRecoveryEnabled = true;
            factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
            EndPointResolver EPResolver=new EndPointResolver();


            EventingBasicConsumer createConsumer() {
                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>{
                    string consumerTag=ea.ConsumerTag;
                    ulong deliveryTag=ea.DeliveryTag;
                    var resend=ea.Redelivered;
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    var routingKey = ea.RoutingKey;
                    
                    Console.WriteLine(" [x] Received '{0}':'{1}':tag {2},resent:{3}", routingKey, message,deliveryTag,resend);
                    if(deliveryTag==4) {
                        Thread.Sleep(2000);
                        channel.BasicAck(999, false); 
                    }
                    else {
                        Thread.Sleep(1000);
                    channel.BasicAck(deliveryTag, false); 
                    }
                    
                };
                consumer.Shutdown += (model, ea) =>{
                    var initiator = ea.Initiator;
                    var reasonCode = ea.ReplyCode;
                    var reasonText = ea.ReplyText;
                    Console.WriteLine("Consumer shutdown event by '{0}' with reasonCode='{1}' and reasonText={2}", initiator, reasonCode,reasonText);
                    //recreate consumer
                    //RecoverFromConsumerShutdown();
                    //consumer = new EventingBasicConsumer(channel);
                    
                };
                consumer.ConsumerCancelled+= (model, ea) =>{
                    var initiator = "Broker";
                    var ConsumerTag = ea.ConsumerTags;
                    
                    Console.WriteLine("ConsumerCancelled event by '{0}' with consumerTag='{1}' ", initiator, ConsumerTag);
                    //recreate consumer
                    //RecoverFromConsumerShutdown();
                    //consumer = new EventingBasicConsumer(channel);
                    
                };

                return consumer;
            };

            IModel createChannel()  {
                IModel channel_=null;
                try{
                    channel_ = connection.CreateModel();
                    channel_.BasicQos(0,1, false); // Per consumer limit
                    Console.WriteLine($"Channel created {channel_.ChannelNumber}");
                    var sequenceNumber = channel_.NextPublishSeqNo;
                    // declare event handlers
                    channel_.ModelShutdown += (sender,ea) => {
                        Console.WriteLine($"Channel closed (ReplyCode) {ea.ReplyCode} ");
                        Console.WriteLine($"Channel closed (ReplyText) {ea.ReplyText} ");
                        Console.WriteLine($"Channel closed (Cause) {ea.Cause} ");
                        Console.WriteLine($"Channel closed (Initiator) {ea.Initiator} "); 
                        // Handle Reconnect
                        //channel_.Dispose();
                        Thread.Sleep(5000);
                        Initialize();
                                   
                    };
                }
                catch(Exception e){
                    Console.WriteLine(" Error When creating Channel");
                    Console.WriteLine($" Error {e.Source}");
                    Console.WriteLine($" Error {e.Message}");
                    Environment.Exit(123);
                }
                return channel_;
            }

            

            void Initialize() 
            {
                try 
                {
                    if(connection==null || !connection.IsOpen)
                    {
                        connection = factory.CreateConnection(EPResolver,".NetTemplateClientConsumer");
                        // declare event handlers
                        connection.ConnectionBlocked += (sender,ea) => {
                            Console.WriteLine("Connection is temporary blocked");
                        };
                        connection.ConnectionUnblocked += (sender,ea) => {
                            Console.WriteLine("Connection is unblocked again");
                        };
                        connection.ConnectionShutdown+= (sender,ea) => {
                            Console.WriteLine("Connection is disconnected");
                        };
                    }    
                }
                catch (Exception e){
                    Console.WriteLine(" Error When creating Connection");
                    Console.WriteLine($" Error {e.Source}");
                    Console.WriteLine($" Error {e.Message}");
                }
                if(channel==null ){
                    try
                    {
                        channel=createChannel();
                    }
                    catch(Exception e){
                        Console.WriteLine(" Error When creating channel");
                    Console.WriteLine($" Error {e.Source}");
                    Console.WriteLine($" Error {e.Message}");
                    }
                }
                if(channel.IsClosed){
                    channel.Close();
                    channel.Dispose();
                    
                    try
                    {
                        channel = connection.CreateModel();
                        //channel=createChannel();
                    }
                    catch(Exception e){
                        Console.WriteLine(" Error When creating channel");
                    Console.WriteLine($" Error {e.Source}");
                    Console.WriteLine($" Error {e.Message}");
                    Console.WriteLine($" stacktrace {e.StackTrace}");
                    }
                }
                   
                var queueName = "STUP";
                
                
                if(consumer==null || !consumer.IsRunning) 
                    consumer=createConsumer();
                try
                {
                    channel.BasicConsume(queue: queueName, autoAck: false,  consumer: consumer);
                    Console.WriteLine($" Connected server is {connection.Endpoint}");
                    Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
                catch (Exception e)
                {
                    Console.WriteLine(" Error When setting up consumer");
                    Console.WriteLine($" Error {e.Source}");
                    Console.WriteLine($" Error {e.Message}");
                    // Handle the failed message in the most convenient way for your system

                    // Handle reconnection since channel is always destroyed on error
                    //channel = createChannel();
                    //Console.WriteLine(" Channel recreated");
                } 
            } //end Initialize

            Initialize();
        }
    }
}



Michael Klishin

unread,
Jun 11, 2020, 8:07:03 AM6/11/20
to rabbitmq-users
See server logs and a traffic capture for clues. We strongly prefer to minimize guessing on this list.

It's not a given that a channel (IModel) shutdown callback is a safe place to open another channel in;
it's certainly not a common scenario to see. But please start with inspecting server logs.

From: rabbitm...@googlegroups.com <rabbitm...@googlegroups.com> on behalf of Stefan Östman <stefan...@gmail.com>
Sent: Thursday, June 11, 2020 12:26
To: rabbitmq-users <rabbitm...@googlegroups.com>
Subject: [rabbitmq-users] TimeoutException from.Net Client on connection.createModel when reccreating channel after modelShutdown event
 
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/9b3d067c-d000-4da5-b537-98eda964bc0co%40googlegroups.com.

Stefan Östman

unread,
Jun 11, 2020, 9:15:14 AM6/11/20
to rabbitmq-users
Hi

On server side everything looks as expected. The new channel is created. The timeout exception occurs locally in the .Net client after the channel is created on the server  somewhere around 
   at RabbitMQ.Util.BlockingCell`1.WaitForValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
considering the stacktrace. But since i don't know what is the actual reason it is hard to say.


I have logged the events using the rabbitmq_event_exchange plugin and get the below events between first connect and the timeout error
Message 1
The server reported 8 messages remaining.

Exchange amq.rabbitmq.event
Routing Key user.authentication.success
Redelivered â—‹
Properties
timestamp: 1591878677
delivery_mode: 2
headers:
auth_mechanism: PLAIN
connection_type: network
host: {192,168,5,28}
name: Consumer
peer_host: {10,14,32,195}
peer_port: 49423
protocol: {0,9,1}
ssl: false
timestamp_in_ms: 1591878677513
Payload
0 bytes
Encoding: string
Message 2
The server reported 7 messages remaining.

Exchange amq.rabbitmq.event
Routing Key connection.created
Redelivered â—‹
Properties
timestamp: 1591878677
delivery_mode: 2
headers:
auth_mechanism: PLAIN
channel_max: 2047
client_properties: {<<"product">>,longstr,<<"RabbitMQ">>}
{<<"version">>,longstr,<<"6.0.0+a50334f2acb09fd16dc9cbd20ad1c6dd093d1d64">>}
{<<"platform">>,longstr,<<".NET">>}
{<<"copyright">>,longstr,<<"Copyright (c) 2007-2020 VMware, Inc.">>}
{<<"information">>,longstr,<<"Licensed under the MPL. See https://www.rabbitmq.com/">>}
{<<"capabilities">>,table,[{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"connection.blocked">>,bool,true},{<<"authentication_failure_close">>,bool,true}]}
{<<"connection_name">>,longstr,<<".NetTemplateClientConsumer">>}
connected_at: 1591878677498
frame_max: 131072
host: {192,168,5,28}
node: rabbit@UTVRABBIT02
peer_cert_issuer:
peer_cert_subject:
peer_cert_validity:
peer_host: {10,14,32,195}
peer_port: 49423
pid: <rab...@UTVRABBIT02.2.4366.15>
port: 5672
protocol: {0,9,1}
ssl: false
ssl_cipher:
ssl_hash:
ssl_key_exchange:
ssl_protocol:
timeout: 60
timestamp_in_ms: 1591878677529
type: network
user: Consumer
user_provided_name: .NetTemplateClientConsumer
user_who_performed_action: Consumer
vhost: MPA
Payload
0 bytes
Encoding: string
Message 3
The server reported 6 messages remaining.

Exchange amq.rabbitmq.event
Routing Key channel.created
Redelivered â—‹
Properties
timestamp: 1591878691
delivery_mode: 2
headers:
connection: <rab...@UTVRABBIT02.2.4366.15>
number: 1
pid: <rab...@UTVRABBIT02.2.4435.15>
timestamp_in_ms: 1591878691837
user: Consumer
user_who_performed_action: Consumer
vhost: MPA
Payload
0 bytes
Encoding: string
Message 4
The server reported 5 messages remaining.

Between these two events four messages was received which are properly acked.
Then I send a faulty ACK that causes the ModelShutdown event.
Channel and consumer are closed.


Exchange amq.rabbitmq.event
Routing Key channel.closed
Redelivered â—‹
Properties
timestamp: 1591878701
delivery_mode: 2
headers:
pid: <rab...@UTVRABBIT02.2.4435.15>
timestamp_in_ms: 1591878701327
user_who_performed_action: Consumer
Payload
0 bytes
Encoding: string
Message 5
The server reported 4 messages remaining.

Exchange amq.rabbitmq.event
Routing Key consumer.deleted
Redelivered â—‹
Properties
timestamp: 1591878701
delivery_mode: 2
headers:
channel: <rab...@UTVRABBIT02.2.4435.15>
consumer_tag: amq.ctag-JFude_nJCdRKWuKjxnmC4w
queue: STUP
timestamp_in_ms: 1591878701327
user_who_performed_action: rmq-internal
vhost: MPA
Payload
0 bytes
Encoding: string
Message 6
The server reported 3 messages remaining.

Then the reconnect run and the new channel is created
Which is also visible in the Admin GUI

Exchange amq.rabbitmq.event
Routing Key channel.created
Redelivered â—‹
Properties
timestamp: 1591878706
delivery_mode: 2
headers:
connection: <rab...@UTVRABBIT02.2.4366.15>
number: 1
pid: <rab...@UTVRABBIT02.2.4497.15>
timestamp_in_ms: 1591878706347
user: Consumer
user_who_performed_action: Consumer
vhost: MPA
Payload
0 bytes
Encoding: string

rabbitmq log does not show so much I see that the connection is crated and that it is not diconnected when i send the faulty ACK
2020-06-11 14:31:17.498 [info] <0.4366.15> accepting AMQP connection <0.4366.15> (10.14.32.195:49423 -> 192.168.5.28:5672)
2020-06-11 14:31:17.513 [info] <0.4366.15> Connection <0.4366.15> (10.14.32.195:49423 -> 192.168.5.28:5672) has a client-provided name: .NetTemplateClientConsumer
2020-06-11 14:31:17.529 [info] <0.4366.15> connection <0.4366.15> (10.14.32.195:49423 -> 192.168.5.28:5672 - .NetTemplateClientConsumer): user 'Consumer' authenticated and granted access to vhost 'MPA'
2020-06-11 14:31:41.312 [error] <0.4435.15> Channel error on connection <0.4366.15> (10.14.32.195:49423 -> 192.168.5.28:5672, vhost: 'MPA', user: 'Consumer'), channel 1:
operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 999


I cannot see anything strange happening on the server side. The error occurs in the .NetClient class 

Where woud you say would be the best way to recreate the channel and reactivate the consumer again. I understand it is very different from solution to solution and depending on what caused the ModelShutdown event. To have a consumer active all the time the recreation  needs to be initiated as soon as possible after the Shutdown. I have waited 5 seconds to wait for NetworkRecovery , must be longer in reality, but just to get the idea of where to do it..


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

Michael Klishin

unread,
Jun 11, 2020, 11:26:36 AM6/11/20
to rabbitmq-users
So you only have a channel exception, as expected. The connection won't be closed. Channel errors are not considered to be fatal (unlike, say, protocol parser errors) by design.

I'm pretty sure I have never tried to open a channel from a channel shutdown event handler.
You are welcome to try it with 6.1.0 😉

Since channel error notification is entirely asynchronous, I'm not sure what could cause a conflict.
Channel ID map in connection state is the only piece of state both channel termination and channel opening code would share.

Tracing/extra logging and code inspection would tell.

Sent: Thursday, June 11, 2020 16:15
To: rabbitmq-users <rabbitm...@googlegroups.com>
Subject: Re: [rabbitmq-users] TimeoutException from.Net Client on connection.createModel when reccreating channel after modelShutdown event
 
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/24b3d10b-c8fa-4db2-8b10-fa7687cdffafo%40googlegroups.com.

Sri Reddy

unread,
Jun 18, 2020, 10:32:47 AM6/18/20
to rabbitmq-users
Hey Stefan,

I am having exactly the same issue, where the channel shutdown event is fired. The channel shutdown event handler then calls the method that creates the channel and when i try to recreate the channel using the existing connection it times out. Did you find the solution to this problem? I would appreciate if you can share your findings.

I also don't see anything unusual in log files. And, also I have the same versions of Erlang, RabbitMQ Server and Client.
Server version 3.8.3
Erlang 22.3
Client: 6.0.0

Please help.
Reply all
Reply to author
Forward
0 new messages