Help catching interrupt so nicely close connection within a class

600 views
Skip to first unread message

Mountainman John

unread,
Mar 16, 2018, 9:44:52 AM3/16/18
to rabbitmq-users
In Python, using Pika and the multiprocessing module, I wrote a class that inherits from multiprocessing.Process.  It creates a process and within it, starts a consumer of the RabbitMQ bus, getting stuff off a direct queue.  The parent program spawns of a process to handle each unique queue.  To handle Ctrl-C (KeyboardInterrupt), the parent sends a SIGTERM to the child processes (ie objects of my class) via os.kill; each child catches SIGTERM, and in turn raises an exception so the try/except in the consumer can catch it and stop consuming from the bus and close the connection.  This works A-OK in a single-threaded scheme where multiprocessing is not used, but I get a failure in this case.    The failure messages I get when I run my program (source code below) is:

020
sighandler_HUP() called on signal 1  by process  30431
 [x] Process 30431 received 'Message 1 .....' on queue red_q2
^CCaught Ctrl-C
Process Consumer-1:
Traceback (most recent call last):
  File "/usr/local/python/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "./rabbit_runit5.py", line 35, in run
    self.channel.start_consuming()
  File "/usr/local/python/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1756, in start_consuming
    self.connection.process_data_events(time_limit=None)
  File "/usr/local/python/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 716, in process_data_events
    self._dispatch_channel_events()
  File "/usr/local/python/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 518, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events()
  File "/usr/local/python/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1385, in _dispatch_events
    evt.body)
  File "./rabbit_runit5.py", line 44, in callback
    time.sleep( body.count('.') )
KeyboardInterrupt
ALL CHILDREN ARE GONE


It appears that within the consumer (affected by the run() method of my class), the exception is not getting caught.  I've been banging my head on a wall on this for over a day and turning to experts for help.   Thanks!

--john

-----   CODE   ---------

#!/usr/bin/env python
################################################################################
# from multiprocessing import Process, Manager
import multiprocessing
import os
import sys
import time
import signal
import pika


class Consumer(multiprocessing.Process):
  def __init__(self,qname,priority):
    multiprocessing.Process.__init__(self)
    # self.proc = multiprocessing.Process(target=self.consumer, args=(qname,))
    self.queue = qname
    self.priority = priority
    self.connection = None
    self.channel = None
    self.qinfo = None
    
    signal.signal(signal.SIGTERM, self.sighandler_TERM)

  def run(self):
    print 'Spawned process %d for queue %s (priority = %d)' % (self.pid,self.queue,self.priority)
    # self.proc.start()
    self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    self.channel = self.connection.channel()
    self.qinfo = self.channel.queue_declare(queue=self.queue)
    self.channel.basic_qos(prefetch_count=1)
    self.channel.basic_consume(self.callback, queue=self.queue)
    print 'Process %d starts consuming on queue %s' % (self.pid,self.queue)
    try:
      self.channel.start_consuming()
    except Exception, exc:
      print 'run(): Exception: %s' % (exc)
      self.channel.stop_consuming()
      self.connection.close() 


  def callback(self, ch, method, properties, body):
    print ' [x] Process %d received %r on queue %s' % (self.pid, body, method.routing_key)
    time.sleep( body.count('.') )
    ch.basic_ack(delivery_tag = method.delivery_tag)


  def sighandler_TERM(self,signum,frame):
    # Signal handler
    print 'sighandler_TERM() called on signal', signum, ' by process ', self.pid
    raise Exception("CAUGHT SIGTERM")


################################################################################
def main():
   # msg_queues = ['red_q2','blue_q2','yellow_q2']
   msg_queues = ['red_q2']
   children = dict()

   # Create the child processes, each handling its own queue
   for qname in msg_queues:
     p = Consumer(qname,1)
     p.start()
     children[p.pid] = p       # Keep track of the kids by their PID
     print 'Spawned process %d for queue %s' % (p.pid,qname)

   print 'Entering main infinite loop'
   try:
      n = 1
      while True:
        print '%03d' % (n)
        time.sleep(1)
        n = n + 1

   except KeyboardInterrupt:
      print 'Caught Ctrl-C'
      for pid in children.keys():
        os.kill(pid, signal.SIGTERM)
      for pid in children.keys():
        children[pid].join()

   print 'ALL CHILDREN ARE GONE'
if __name__ == "__main__":
  main()

Luke Bakken

unread,
Mar 19, 2018, 10:17:30 AM3/19/18
to rabbitmq-users
Hi John,

You might be running into this issue - https://github.com/pika/pika/pull/991

Try using an asynchronous consumer rather than a blocking one. I realize this is more work, but a KeyboardInterrupt exception should work in that case.


Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages