How to gracefully interrupt message consumer in rabbitpy?

125 views
Skip to first unread message

vitaly numenta

unread,
May 12, 2015, 7:57:29 PM5/12/15
to rabbitm...@googlegroups.com
With pika, I used to be able to set a timer before entering the consumer generator. My timer would run periodically and check an application-specific variable to determine if the consumer should be aborted. When it was time to abort the consumer, the timer callback would raise an application-specific exception, thus causing the consumer to exit the loop. After this, the channel and connection were still usable, and I could continue using them or perform graceful close.

How can I accomplish the equivalent with rabbitpy?

Many thanks,
Vitaly

Gavin M. Roy

unread,
May 12, 2015, 8:53:30 PM5/12/15
to vitaly numenta, rabbitm...@googlegroups.com
I would probably:

1 - have said application specific variable be a threading.Event
2 - create a watcher thread that has access to the rabbitpy.Queue object instance; 
     that when the threading.Event is set, calls queue.stop_consuming()

Here's an example using a timer to set the threading.Event instance:


Gavin

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

vitaly numenta

unread,
May 12, 2015, 10:09:55 PM5/12/15
to rabbitm...@googlegroups.com, vitaly.kru...@gmail.com
Thanks Gavin, but I am a bit worried about race conditions that might result from calling queue.stop_consuming from another thread, because the rabbitmq channel code is not thread-safe in general.

I think that control will end up in the following function, which doesn't see to be thread-safe:

<?python
    def _cancel_consumer(self, obj, consumer_tag=None, nowait=False):
        """Cancel the consuming of a queue.
        :param rabbitpy.amqp_queue.Queue obj: The queue to cancel
        """
        consumer_tag = consumer_tag or obj.consumer_tag
        self._interrupt_wait_on_frame()
        if consumer_tag in self._consumers:
            del self._consumers[consumer_tag]
        self.write_frame(spec.Basic.Cancel(consumer_tag=consumer_tag))
        if not nowait and not self.closed:
            self._wait_on_frame(spec.Basic.CancelOk)
?>
Reply all
Reply to author
Forward
0 new messages