How to unblock channel.start_consuming() from another thread?

4,729 views
Skip to first unread message

Povilas Balciunas

unread,
Oct 7, 2015, 5:44:31 AM10/7/15
to Pika
The problem is pretty much the same as http://stackoverflow.com/questions/32220057/interrupt-thread-with-start-consuming-method-of-pika

Basically, I have a consumer running in a separate thread. I want to terminate it from the main thread (https://github.com/povilasb/rabbitmq-python-sample/blob/master/src/consumer.py).

I've tried to call `channel.stop_consuming()`, `channel.basic_cancel()` from the main thread - none of these calls worked.

The only solution that worked was to stop consuming from the same thread pika started consuming:


 
# Thread 2 starts consuming
 channel
.basic_consume(on_msg_receive, queue = queue_name, no_ack = True)
 channel
.start_consuming()

 
def on_msg_receive(channel, method, properties, body):
   
print("Received %r" % body)
   
if body == 'END':
     channel
.stop_consuming()

 
# Thread 1 sends terminating message
 channel
.basic_publish(exchange="my_msgs", routing_key="consumer1", body='END')



Also, I've read pika Channel is not thread safe. So this solution might not be the proper one. 

Guys, any other ideas?

Thanks.

Stuart Longland

unread,
Oct 7, 2015, 5:49:25 AM10/7/15
to pika-...@googlegroups.com
On 07/10/15 19:44, Povilas Balciunas wrote:
> The problem is pretty much the same
> as http://stackoverflow.com/questions/32220057/interrupt-thread-with-start-consuming-method-of-pika
>
> Basically, I have a consumer running in a separate thread. I want to
> terminate it from the main thread
> (https://github.com/povilasb/rabbitmq-python-sample/blob/master/src/consumer.py).
>
> I've tried to call `channel.stop_consuming()`, `channel.basic_cancel()`
> from the main thread - none of these calls worked.
>
> The only solution that worked was to stop consuming from the same thread
> pika started consuming:
>
> |
>
> # Thread 2 starts consuming
> channel.basic_consume(on_msg_receive,queue =queue_name,no_ack =True)
> channel.start_consuming()

This is a bit of a kludge, but I see in the Pika docs:
> | add_timeout(self, deadline, callback_method)
> | Add the callback_method to the IOLoop timer to fire after deadline
> | seconds. Returns a handle to the timeout. Do not confuse with
> | Tornado's timeout where you pass in the time you want to have your
> | callback called. Only pass in the seconds until it's to be called.
> |
> | :param int deadline: The number of seconds to wait to call callback
> | :param method callback_method: The callback method
> | :rtype: str

So maybe in MainThread you could do this:

def kill():
channel.stop_consuming()
connection.add_timeout(0, kill)

That would, in theory, schedule a call to the 'kill' method (which has a
reference to the channel) that can then call stop_consuming on that channel.
--
Stuart Longland (aka Redhatter, VK4MSL)

I haven't lost my mind...
...it's backed up on a tape somewhere.

Povilas Balciunas

unread,
Oct 7, 2015, 7:01:55 AM10/7/15
to Pika
Thanks, this dis work.

Although, the timeout does not trigger immediately. It takes about a second :)

vitaly numenta

unread,
Oct 8, 2015, 1:16:11 PM10/8/15
to Pika
Povilas, although it might seem like it works, connection.add_timeout() is NOT thread-safe, and the logic will likely run into problems sooner or later. Take a look at the implementation, and you will see why. A safe way would be to maintain a repeating timer on the consuming thread: start the timer on the consumer thread before it calls start_consuming, and keep re-scheduling it each time it fires (you will get a new timer id each time; save it so that you can cancel the timer when start_consuming exits). Make the timer period large - e.g., half a second or more to avoid performance implications. The timer callback would check a flag, and if the flag is set, it would call channel.stop_consuming(). Your other thread would set that flag when it wants the consumer to exit. When start_consuming returns, be sure to cancel the timer, if it's active. Remember that start_consuming may return also when there are no more consumers left (if something deletes a queue, RabbitMQ may cancel consumers on that queue).

If you are using pika 0.10.x and only have a single consumer, then pika 0.10.0 offers a simpler solution via the channel.consume's inactivity_timeout arg (no need to maintain your own timer). For example, pass inactivity_timeout=1. This will cause the generator to yield None whenever there is no activity for the specified time. Then, as you iterate over the generator, check the flag that is set by your other thread and break out of the iteration loop if the flag is set.   See documentation at https://github.com/pika/pika/blob/87d41009cd53a529939a508a41f5f7e148d0b729/pika/adapters/blocking_connection.py#L1723.

For example:

for msg in channel.consume("my-queue-name", inactivity_timeout=1):
    if g_stop_consuming:
        # the other thread wants us to stop consuming
        break
    if msg is None:
        # got due to inactivity timeout
        continue

    method, properties, body = msg
    # Process the message ...

vitaly numenta

unread,
Oct 8, 2015, 1:23:24 PM10/8/15
to Pika
... and, when the loop terminates, don't forget to cancel the consumer that your created via channel.consume(...) by calling channel.cancel()
Reply all
Reply to author
Forward
0 new messages