Rabbit MQ Asynchronous client stops dequeuing, any input on this client.

354 views
Skip to first unread message

Hass

unread,
Sep 25, 2015, 4:35:20 PM9/25/15
to rabbitmq-users
Below is my rabbit mq client code. It runs great most of the time. But sometimes it just stops dequeuing without reporting any errors in the logs. I am thinking the dequeuing  thread dies silently. 

Any help is appreciated.


using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;
using System.Configuration;

namespace RabbitMQClient
{
  public class MessageQueueConsumer : IHealthVerifiable
  {
    public class TimeoutException : Exception { }

     private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
    {
      public int Count()
      {
        return this.m_queue.Count;
      }
    }

    private const int DEFAULT_ACK_COUNT = 1000;

    private String connString;
    private EventingBasicConsumer consumer;
    private IConnection conn;
    private IModel channel;
    private String queueName;
    private BufferQueue buffer;
    private Object locker = new Object();
    private ushort prefetchCount;
    private ushort ackCount;
    private bool stopRequested;
    private BasicDeliverEventArgs lastMsgDequeued;
   
    public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
    {
      this.queueName = queueName;
      this.connString = connString;
      if (ackCount != null)
        this.ackCount = ackCount.Value;
      else
      {
        if (!string.IsNullOrEmpty(ConfigurationManager.AppSettings["DefultAckCount"]))
          this.ackCount = ushort.Parse(ConfigurationManager.AppSettings["DefultAckCount"].ToString());
        else
          ackCount = DEFAULT_ACK_COUNT;
      }
      this.prefetchCount = (ushort)(this.ackCount * 2);
      stopRequested = false;
      InitConsumer();
    }

    ~MessageQueueConsumer()
    {
      Close();
    }

    public void StopRequested()
    {
      stopRequested = true;
    }

    public void Close()
    {
     }

    private void InitConsumer()
    {
      ConnectionFactory factory = new ConnectionFactory();
      factory.Uri = connString;
      conn = factory.CreateConnection();
      channel = conn.CreateModel();
      channel.BasicQos(0, prefetchCount, false);
      buffer = new BufferQueue();

      consumer = new EventingBasicConsumer(channel);
      channel.BasicConsume(queueName, false, consumer);

      // when message is recieved do following
      consumer.Received += (model, message) =>
      {
        //if reached max buffer size sleep for 3 seconds to give dequeing thread some time to catch up
        if (buffer.Count() > DEFAULT_ACK_COUNT)
          Thread.Sleep(3000);

        buffer.Enqueue(message);

        //reached max buffer ack messages in queue
        if (buffer.Count() > DEFAULT_ACK_COUNT)
          channel.BasicAck(message.DeliveryTag, true);
      };
    }

    /// <summary>
    /// Get the next event from the queue
    /// </summary>
    /// <returns>Event</returns>
    public byte[] Dequeue(int? timeout = null)
    {
      lock (locker)
      {
        try
        {
          return AttemptDequeue(timeout);
        }
        catch (EndOfStreamException)
        {
          // Network interruption while reading the input stream
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (OperationInterruptedException)
        {
          // The consumer was removed, either through channel or connection closure, or through the
          // action of IModel.BasicCancel().
          // Attempt to reopen and try again
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (ConnectFailureException)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
      }
    }

    private byte[] AttemptDequeue(int? tomeout)
    {
      BasicDeliverEventArgs message;

      while (true)
      {
        if (stopRequested)
        {
          //Ack last message in buffer and send that
          if (lastMsgDequeued != null)
          {
            channel.BasicAck(lastMsgDequeued.DeliveryTag, true);
            lastMsgDequeued = null;
            return lastMsgDequeued.Body;
          }
        }
        else //while buffer has no events
        {
          if (buffer.Count() == 0)
          {
            if (lastMsgDequeued != null)
            {
              channel.BasicAck(lastMsgDequeued.DeliveryTag, true);
              lastMsgDequeued = null;
            }
            Thread.Sleep(3000);
          }
          else
          {
            message = buffer.Dequeue();
            lastMsgDequeued = message;
            break;
          }
        }
      }

      try
      {
        return message.Body;
      }
      catch (Exception e)
      {
        throw new SerializationException("Error deserializing queued message:", e);
      }
    }

    /// <summary>
    /// Attempt to connect to queue to see if it is available
    /// </summary>
    /// <returns>true if queue is available</returns>
    public bool IsHealthy()
    {
      try
      {
        if (channel.IsOpen)
          return true;
        else
        {
          InitConsumer();
          return true;
        }
      }
      catch
      {
        return false;
      }
    }
  }
}



Michael Klishin

unread,
Sep 25, 2015, 4:53:35 PM9/25/15
to rabbitm...@googlegroups.com, Hass
On 25 Sep 2015 at 23:35:25, Hass (jamin...@gmail.com) wrote:
> Below is my rabbit mq client code. It runs great most of the time.
> But sometimes it just stops dequeuing without reporting any
> errors in the logs. I am thinking the dequeuing thread dies silently.

Please check RabbitMQ logs first. Most likely it’s an unhandled exception in the consumer
or a channel-level error (for which there will be entries in the server log).

Adding channel shutdown event handlers is also a good idea. 
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


Hass

unread,
Sep 28, 2015, 11:54:38 AM9/28/15
to rabbitmq-users, jamin...@gmail.com
Thanks for you reply, Do you mean channel.ModelShutdown  or EventingBasicConsumer.Shutdown event?

Michael Klishin

unread,
Sep 28, 2015, 2:52:57 PM9/28/15
to rabbitm...@googlegroups.com, Hass, jamin...@gmail.com
On 28 Sep 2015 at 23:54:41, Hass (jamin...@gmail.com) wrote:
> Thanks for you reply, Do you mean channel.ModelShutdown or
> EventingBasicConsumer.Shutdown event?

Both. they are parts of the same sequence of events dispatched during connection/channel (model)
closure, normal or abnormal. So both can be useful but also will be invoked under
the same circumstances in most cases. 

Hass

unread,
Sep 28, 2015, 3:01:18 PM9/28/15
to rabbitmq-users, jamin...@gmail.com
Cool,
So I added following changes

consumer.Shutdown += (o, e) =>
      {
        InitConsumer();
      };

And also set factory.AutomaticRecoveryEnabled = true;

Michael Klishin

unread,
Sep 28, 2015, 3:31:22 PM9/28/15
to rabbitm...@googlegroups.com, Hass
On 29 Sep 2015 at 03:01:21, Hass (jamin...@gmail.com) wrote:
> So I added following changes
>
> consumer.Shutdown += (o, e) =>
> {
> InitConsumer();
> };
>
> And also set factory.AutomaticRecoveryEnabled = true;

and that helped your case?

By the way, initialising consumers on their shutdown *may* be a bit
premature, in the sense that it creates a natural race condition between
your code and library-driven recovery.

There should be recovery event handlers at least on channels,
I will check whether .NET client has them and report back.

Hass

unread,
Sep 30, 2015, 10:07:27 AM9/30/15
to rabbitmq-users, jamin...@gmail.com
So I deployed code which has this...since we have alot of load on RMQ server I see piling up 9-10k client connections,  causing us to exceed the OS limits on connections and file handles. This causes the RMQ service to crash, and not recover.


private void InitConsumer()
    {
      ConnectionFactory factory = new ConnectionFactory();
      factory.Uri = connString;
      //default retry attempt is 5 seconds
      factory.AutomaticRecoveryEnabled = true;
      conn = factory.CreateConnection();
      channel = conn.CreateModel();
      channel.BasicQos(0, prefetchCount, false);
      buffer = new BufferQueue();

      consumer = new EventingBasicConsumer(channel);
      channel.BasicConsume(queueName, false, consumer);

      // If the consumer shutdowns reconnect to rabbit and begin reading from the queue again.
      // only if background rabbit mq threads were not able to connect in 5 seconds
      consumer.Shutdown += (o, e) =>
      {
        Thread.Sleep(50000);
        if (!consumer.IsRunning)
          InitConsumer();
      };

So I am removing my consumer.Shutdown code completely and see that will help. And hoping factory.AutomaticRecoveryEnabled = true; will recover dequeue client on network problem.

Hass

unread,
Sep 30, 2015, 10:29:29 AM9/30/15
to rabbitmq-users, jamin...@gmail.com
In the above, I am  assuming that the increase in number of connections was due to factory.AutomaticRecoveryEnabled = true; opening new connections when old ones drop off and my custom handling of consumer.Shutdown also opening unnecessary connections.
Reply all
Reply to author
Forward
0 new messages