using System.Collections.Generic;
using System.Text;
using System.Threading;
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace MPA.RabbitMQ.Client
{
  class ReceiveLogsTopic
  {
    Â
    public static void Main(string[] args)
    {
      EventingBasicConsumer consumer=null;
      IModel channel=null;
      IConnection connection=null;
      var factory = new ConnectionFactory() {Â
        VirtualHost="/",
        UserName="USERNAME",
        Password="PASSWORD"Â
        };
      factory.AutomaticRecoveryEnabled = true;
      factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
      EndPointResolver EPResolver=new EndPointResolver();
      EventingBasicConsumer createConsumer() {
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>{
          string consumerTag=ea.ConsumerTag;
          ulong deliveryTag=ea.DeliveryTag;
          var resend=ea.Redelivered;
          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
          var routingKey = ea.RoutingKey;
          Â
          Console.WriteLine(" [x] Received '{0}':'{1}':tag {2},resent:{3}", routingKey, message,deliveryTag,resend);
          if(deliveryTag==4) {
            Thread.Sleep(2000);
            channel.BasicAck(999, false);Â
          }
          else {
            Thread.Sleep(1000);
          channel.BasicAck(deliveryTag, false);Â
          }
          Â
        };
        consumer.Shutdown += (model, ea) =>{
          var initiator = ea.Initiator;
          var reasonCode = ea.ReplyCode;
          var reasonText = ea.ReplyText;
          Console.WriteLine("Consumer shutdown event by '{0}' with reasonCode='{1}' and reasonText={2}", initiator, reasonCode,reasonText);
          //recreate consumer
          //RecoverFromConsumerShutdown();
          //consumer = new EventingBasicConsumer(channel);
          Â
        };
        consumer.ConsumerCancelled+= (model, ea) =>{
          var initiator = "Broker";
          var ConsumerTag = ea.ConsumerTags;
          Â
          Console.WriteLine("ConsumerCancelled event by '{0}' with consumerTag='{1}' ", initiator, ConsumerTag);
          //recreate consumer
          //RecoverFromConsumerShutdown();
          //consumer = new EventingBasicConsumer(channel);
          Â
        };
        return consumer;
      };
      IModel createChannel() {
        IModel channel_=null;
        try{
          channel_ = connection.CreateModel();
          channel_.BasicQos(0,1, false); // Per consumer limit
          Console.WriteLine($"Channel created {channel_.ChannelNumber}");
          var sequenceNumber = channel_.NextPublishSeqNo;
          // declare event handlers
          channel_.ModelShutdown += (sender,ea) => {
            Console.WriteLine($"Channel closed (ReplyCode) {ea.ReplyCode} ");
            Console.WriteLine($"Channel closed (ReplyText) {ea.ReplyText} ");
            Console.WriteLine($"Channel closed (Cause) {ea.Cause} ");
            Console.WriteLine($"Channel closed (Initiator) {ea.Initiator} ");Â
            // Handle Reconnect
            //channel_.Dispose();
            Thread.Sleep(5000);
            Initialize();
                 Â
          };
        }
        catch(Exception e){
          Console.WriteLine(" Error When creating Channel");
          Console.WriteLine($" Error {e.Source}");
          Console.WriteLine($" Error {e.Message}");
          Environment.Exit(123);
        }
        return channel_;
      }
      Â
      void Initialize()Â
      {
        tryÂ
        {
          if(connection==null || !connection.IsOpen)
          {
            connection = factory.CreateConnection(EPResolver,".NetTemplateClientConsumer");
            // declare event handlers
            connection.ConnectionBlocked += (sender,ea) => {
              Console.WriteLine("Connection is temporary blocked");
            };
            connection.ConnectionUnblocked += (sender,ea) => {
              Console.WriteLine("Connection is unblocked again");
            };
            connection.ConnectionShutdown+= (sender,ea) => {
              Console.WriteLine("Connection is disconnected");
            };
          }  Â
        }
        catch (Exception e){
          Console.WriteLine(" Error When creating Connection");
          Console.WriteLine($" Error {e.Source}");
          Console.WriteLine($" Error {e.Message}");
        }
        if(channel==null ){
          try
          {
            channel=createChannel();
          }
          catch(Exception e){
            Console.WriteLine(" Error When creating channel");
          Console.WriteLine($" Error {e.Source}");
          Console.WriteLine($" Error {e.Message}");
          }
        }
        if(channel.IsClosed){
          channel.Close();
          channel.Dispose();
          Â
          try
          {
            channel = connection.CreateModel();
            //channel=createChannel();
          }
          catch(Exception e){
            Console.WriteLine(" Error When creating channel");
          Console.WriteLine($" Error {e.Source}");
          Console.WriteLine($" Error {e.Message}");
          Console.WriteLine($" stacktrace {e.StackTrace}");
          }
        }
         Â
        var queueName = "STUP";
        Â
        Â
        if(consumer==null || !consumer.IsRunning)Â
          consumer=createConsumer();
        try
        {
          channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
          Console.WriteLine($" Connected server is {connection.Endpoint}");
          Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
          Console.WriteLine(" Press [enter] to exit.");
          Console.ReadLine();
        }
        catch (Exception e)
        {
          Console.WriteLine(" Error When setting up consumer");
          Console.WriteLine($" Error {e.Source}");
          Console.WriteLine($" Error {e.Message}");
          // Handle the failed message in the most convenient way for your system
          // Handle reconnection since channel is always destroyed on error
          //channel = createChannel();
          //Console.WriteLine(" Channel recreated");
        }Â
      } //end Initialize
      Initialize();
    }
  }
}