EventingBasicConsumer Received is holding onto Application event handler reference

381 views
Skip to first unread message

P Meier

unread,
Apr 4, 2017, 5:13:56 PM4/4/17
to rabbitmq-users
Hi,

I am using the EventingBasicConsumer and have a Subscribe and an Unsubscribe message that attach and detach the Received event as seen below.  I have a .NET windows form application that calls Subscribe and can also call Unsubscribe.  This is working as expected.  The problem I can't figure out is if the .NET application is closed expectedly or unexpectedly without the Unsubscribe being called, the .NET application is still held as a background process and is causing a memory leak.  The only way to get rid of this background process is to End Task or to restart the Rabbit MQ service. 

public void Subscribe(string topic, EventHandler eh)
    {
     //previous code omitted for brevity

        channel.ExchangeDeclare(Exchange, Type);
        var queueName = channel.QueueDeclare().QueueName;
        channel.QueueBind(queueName, Exchange, topic);

        // Set up the consumer
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += ConsumerReceived;

        string consumerTag = GenerateConsumerTag(topic, eh);

        channel.BasicConsume(queue: queueName,
                             noAck: true,
                             consumerTag: consumerTag,
                             consumer: consumer);
      
    }

public void Unsubscribe(string topic, EventHandler eh)
    {
          subscriberInfo.channel.BasicCancel(
            subscriberInfo.consumerTag);
          subscriberInfo.channel.Close();
          subscriberInfo.consumer.Received -= ConsumerReceived;
        }

private void ConsumerReceived(object model, BasicDeliverEventArgs ea)
    {
      // do something with message
    }

I have even written a WeakEventManager for this (not reflected in above code) and the problem still exists.  Has anyone else had this problem?  Any suggestions on how to handle this?  

Thanks,
P. Meier

Karl Nilsson

unread,
Apr 5, 2017, 4:47:51 AM4/5/17
to rabbitm...@googlegroups.com
Hi,

Which version of the .NET client are you using?

Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Karl Nilsson

Staff Software Engineer, Pivotal/RabbitMQ

Pamela Meier

unread,
Apr 5, 2017, 7:45:48 AM4/5/17
to rabbitm...@googlegroups.com
Hi Karl, 
I am using 4.6.1.
Thanks,
P. Meier


To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Karl Nilsson

Staff Software Engineer, Pivotal/RabbitMQ

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/a2C8x4Dkf-c/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

Michael Klishin

unread,
Apr 5, 2017, 7:55:45 AM4/5/17
to rabbitm...@googlegroups.com
That must be the version of .NET Framework. What version of RabbitMQ .NET client is used?

To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

P Meier

unread,
Apr 5, 2017, 9:44:54 PM4/5/17
to rabbitmq-users
RabbitMQ.Client.3.6.5 is being used.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Karl Nilsson

Staff Software Engineer, Pivotal/RabbitMQ

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/a2C8x4Dkf-c/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--

Michael Klishin

unread,
Apr 6, 2017, 12:06:37 AM4/6/17
to rabbitm...@googlegroups.com
Have you tried 4.1.3 at least to compare?

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

P Meier

unread,
Apr 6, 2017, 11:12:03 AM4/6/17
to rabbitmq-users
I upgraded to 4.1.3 today and retested and I get the same behavior.  I tried with the way i posted originally and with the WeakEventManager I wrote below.  From my testing so far, unless the .BasicCancel(consumerTag) and .Close() are called, the .Net test app will never be released from memory without an end task or restarting the Rabbit MQ service.

With my WeakEventManager, the methods change to the following:
public void Subscribe(string topic, EventHandler eh)
    {
     //previous code omitted for brevity

        channel.ExchangeDeclare(Exchange, Type);
        var queueName = channel.QueueDeclare().QueueName;
        channel.QueueBind(queueName, Exchange, topic);

        // Set up the consumer
        var consumer = new EventingBasicConsumer(channel);
        WeakEventListener listener = new WeakEventListener(eh);
        SubscriptionEventManager.AddListener(consumer, listener);

        string consumerTag = GenerateConsumerTag(topic, eh);

        channel.BasicConsume(queue: queueName,
                             noAck: true,
                             consumerTag: consumerTag,
                             consumer: consumer);
      
    }

public void Unsubscribe(string topic, EventHandler eh)
    {
          subscriberInfo.channel.BasicCancel(
            subscriberInfo.consumerTag);
          subscriberInfo.channel.Close();
          SubscriptionEventManager.RemoveListener(subscriberInfo.consumer, listenerInfo);
        }

public class SubscriptionEventManager : WeakEventManager
  {
    public static SubscriptionEventManager CurrentManager
    {
      get
      {
        var manager_type = typeof(SubscriptionEventManager);
        var manager = WeakEventManager.GetCurrentManager(manager_type) as SubscriptionEventManager;

        if (manager == null)
        {
          manager = new SubscriptionEventManager();
          WeakEventManager.SetCurrentManager(manager_type, manager);
        }

        return manager;
      }
    }

    public static void AddListener(object source, IWeakEventListener listener)
    {
      CurrentManager.ProtectedAddListener(source, listener);
    }

    public static void RemoveListener(object source, IWeakEventListener listener)
    {
      CurrentManager.ProtectedRemoveListener(source, listener);
    }

    protected override void StartListening(object source)
    {
      ((EventingBasicConsumer)source).Received 
        += new EventHandler<BasicDeliverEventArgs>(DeliverMessage);
    }

    protected override void StopListening(object source)
    {
      ((EventingBasicConsumer)source).Received 
        -= new EventHandler<BasicDeliverEventArgs>(DeliverMessage);
    }

    void DeliverMessage(object sender, BasicDeliverEventArgs e)
    {
      DeliverEvent(sender, e);
    }
  }

public sealed class WeakEventListener : IWeakEventListener
  {
    private readonly EventHandler _handler;

    public WeakEventListener(EventHandler handler)
    {
      _handler = handler;
    }

    bool IWeakEventListener.ReceiveWeakEvent(Type managerType, object sender, EventArgs e)
    {
      if (null == e)
        return false;

      CallHandler(sender, e);

      return true;
    }

    public void CallHandler(object sender, EventArgs e)
    {
      BasicDeliverEventArgs ea = (BasicDeliverEventArgs)e;
      var body = ea.Body;
      var message = Encoding.UTF8.GetString(body);

      _handler(message, e);
    }
  }

Karl Nilsson

unread,
Apr 6, 2017, 11:25:44 AM4/6/17
to rabbitm...@googlegroups.com
I'm not convinced it is the event handler keeping your app open. It is more likely to be one of the background dispatcher or tcp reader threads. I can simulate the same in a simple console app by not disposing/closing of the connection at the end of the main method. 

Is there any you can intercept the unexpected app closure and .Close() the rabbitmq connection at that point?

Cheers
Karl

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Apr 6, 2017, 2:43:06 PM4/6/17
to rabbitm...@googlegroups.com
It could be the I/O thread running or a consumer dispatch pool thread.
A thread dump would prove this hypothesis.

Calling Connection#Close() when your application stops is generally the right thing to do.
The client cannot know when it's time to stop the threads it may be using otherwise.

P Meier

unread,
Apr 7, 2017, 11:44:29 AM4/7/17
to rabbitmq-users
Thank you both Karl and Michael for your fast replies.  I have spent a bit of time on this problem and have to move onto something else for the time being.  I am sure I will get back into this in the near future.
Reply all
Reply to author
Forward
0 new messages