My connection is re-established but the consumers are not re-connected to the queue [rabbitmq]

390 views
Skip to first unread message

Félix López

unread,
May 21, 2014, 4:12:35 AM5/21/14
to carrot...@googlegroups.com
Hi all,

I'm not sure whether this is a bug or I'm not using correctly kombu. I have a connection with one consumer, I'm also using heartbeats. The problem happens when Too many heartbeats is reached while a message is being processed, once the message is processed, the eventloop tries to process a new event, in that moment the connection is re-established internally, but the consumer is not reconnected. Is there a way to have a callback or something when the connection is re-established  so I can reconnect the consumers. This my code:

    def monitor_heartbeats(self, connection):
        """Function to send heartbeat checks to RabbitMQ. This keeps the
        connection alive over long-running processes."""
        if not connection.heartbeat:
            logger.info("No heartbeat set for connection: %s" % connection)
            return
        interval = connection.heartbeat
        cref = weakref.ref(connection)
        logger.info("Starting heartbeat monitor.")

        def heartbeat_check():
            conn = cref()
            if conn is not None and conn.connected:
                try:
                    conn.heartbeat_check(rate=interval)
                    logger.info("Ran heartbeat check.")
                except Exception as e:
                    logger.warning("Error checking the heartbeat: %r" %e)
                spawn_after(interval/2, heartbeat_check)
        return spawn_after(interval/2, heartbeat_check)

    def run(self):
        self.connection = Connection(self.amqp_url, heartbeat=30)
        logger.info("Starting worker.")
        self.monitor_heartbeats(self.connection)
        while 1:
            try:
                self.consumers = []
                self.consumers.extend([self.connection.Consumer(q, callbacks=[self.on_message]) for q in self.queues])
                self.__apply_qos(self.consumers)
                with nested(*self.consumers):
                    for _ in eventloop(self.connection, timeout=1, ignore_timeouts=True):
                        pass
            except KeyboardInterrupt:
                print('bye bye')
                break
            except self.connection.connection_errors + self.connection.channel_errors as exc:
                print('Connection lost: {0!r}'.format(exc))
            except Exception as e:
                print('Error: {0!r}'.format(e))

I've been digging in the code and this what I think it's happening:
  - A message arrives and the it takes quite a long.
  - the connection is closed due to "Too many heartbeats missed"
  - The message is finally processed.
  - The eventloop tries to process a new event
  - The connection is closed so an exception is raised (error: error(32 broken pipe'))by it's caught here https://github.com/celery/kombu/blob/master/kombu/common.py#L184
  - Then eventloop tries to process a new event again.
  - At this moment, when the tries to use the connection, it's re-established https://github.com/celery/kombu/blob/master/kombu/connection.py#L740

As the first exception is caught internally and the connection is also re-established internally I ca't reconnect the consumer. 

Should I use the eventloop in a different way? Otherwise I was thinking on forking and add a callback to notify when the connection is re-established.

Thanks a lot! it's a long read
 

Ask Solem

unread,
May 23, 2014, 9:33:47 AM5/23/14
to carrot...@googlegroups.com
On May 21, 2014, at 9:12 AM, Félix López <felix...@shuttlecloud.com> wrote:

Hi all,

I'm not sure whether this is a bug or I'm not using correctly kombu. I have a connection with one consumer, I'm also using heartbeats. The problem happens when Too many heartbeats is reached while a message is being processed, once the message is processed, the eventloop tries to process a new event, in that moment the connection is re-established internally, but the consumer is not reconnected. Is there a way to have a callback or something when the connection is re-established  so I can reconnect the consumers. This my code:

  - The connection is closed so an exception is raised (error: error(32 broken pipe'))by it's caught here https://github.com/celery/kombu/blob/master/kombu/common.py#L184

Thanks!

I’m not sure why it catches socket.error there.  I would say it definitely shouldn’t and that it’s a bug.



Your code is unlikely to reconnect anyway since you don’t use ensure_connection().

It can do so at the start of the `while 1` loop to make sure that it’s connected from when the program
starts, or do so in the `except connection_errors…` to make sure it reconnects after a connection error.

The Celery worker reconnects like this:

The error handler is just a way to report the event to the user, you can also specify a retry policy
to specify when to give up, how often to retry and so on:


Reply all
Reply to author
Forward
0 new messages