Hi all,
I'm not sure whether this is a bug or I'm not using correctly kombu. I have a connection with one consumer, I'm also using heartbeats. The problem happens when Too many heartbeats is reached while a message is being processed, once the message is processed, the eventloop tries to process a new event, in that moment the connection is re-established internally, but the consumer is not reconnected. Is there a way to have a callback or something when the connection is re-established so I can reconnect the consumers. This my code:
def monitor_heartbeats(self, connection):
"""Function to send heartbeat checks to RabbitMQ. This keeps the
connection alive over long-running processes."""
if not connection.heartbeat:
logger.info("No heartbeat set for connection: %s" % connection)
return
interval = connection.heartbeat
cref = weakref.ref(connection)
def heartbeat_check():
conn = cref()
if conn is not None and conn.connected:
try:
conn.heartbeat_check(rate=interval)
except Exception as e:
logger.warning("Error checking the heartbeat: %r" %e)
spawn_after(interval/2, heartbeat_check)
return spawn_after(interval/2, heartbeat_check)
def run(self):
self.connection = Connection(self.amqp_url, heartbeat=30)
self.monitor_heartbeats(self.connection)
while 1:
try:
self.consumers = []
self.consumers.extend([self.connection.Consumer(q, callbacks=[self.on_message]) for q in self.queues])
self.__apply_qos(self.consumers)
with nested(*self.consumers):
for _ in eventloop(self.connection, timeout=1, ignore_timeouts=True):
pass
except KeyboardInterrupt:
print('bye bye')
break
except self.connection.connection_errors + self.connection.channel_errors as exc:
print('Connection lost: {0!r}'.format(exc))
except Exception as e:
print('Error: {0!r}'.format(e))
I've been digging in the code and this what I think it's happening:
- A message arrives and the it takes quite a long.
- the connection is closed due to "Too many heartbeats missed"
- The message is finally processed.
- The eventloop tries to process a new event
- Then eventloop tries to process a new event again.
As the first exception is caught internally and the connection is also re-established internally I ca't reconnect the consumer.
Should I use the eventloop in a different way? Otherwise I was thinking on forking and add a callback to notify when the connection is re-established.
Thanks a lot! it's a long read