Thanks Gavin, but I am a bit worried about race conditions that might result from calling queue.stop_consuming from another thread, because the rabbitmq channel code is not thread-safe in general.
I think that control will end up in the following function, which doesn't see to be thread-safe:
def _cancel_consumer(self, obj, consumer_tag=None, nowait=False):
"""Cancel the consuming of a queue.
:param rabbitpy.amqp_queue.Queue obj: The queue to cancel
"""
consumer_tag = consumer_tag or obj.consumer_tag
self._interrupt_wait_on_frame()
if consumer_tag in self._consumers:
del self._consumers[consumer_tag]
self.write_frame(spec.Basic.Cancel(consumer_tag=consumer_tag))
if not nowait and not self.closed:
self._wait_on_frame(spec.Basic.CancelOk)