I'm using kombu with rabbitmq to process some tasks that can take a while. I send the ack after the tasks it's done, if the connection is closed or whatever, for example IOError: Socket closed, I call ensure_connection to establish it again. The problem with this is that the message is processed twice, because once the connection is closed, the message is requeue automatically.
I can fix easily sending the ack as the message arrive, but what would happen if the process dies? The task would be lost. I was wondering what people normally do for this problems.
Note that I receive every now and then an "IOError: Socket closed" even though I'm using heartbeats. This my code if it helps to understand my problem:
def __apply_qos(self, consumers):
for c in consumers:
c.qos(prefetch_count=1)
def monitor_heartbeats(self, connection):
"""Function to send heartbeat checks to RabbitMQ. This keeps the
connection alive over long-running processes."""
if not connection.heartbeat:
logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
return
interval = connection.heartbeat
cref = weakref.ref(connection)
def heartbeat_check():
conn = cref()
if conn is not None and conn.connected:
conn.heartbeat_check(rate=interval)
spawn_after(interval/2, heartbeat_check)
return spawn_after(interval/2, heartbeat_check)
def run(self):
self.connection = Connection(self.amqp_url, heartbeat=100)
self.connection.ensure_connection()
self.monitor_heartbeats(self.connection)
self.consumers.extend([self.connection.Consumer(q, callbacks=[self.on_message]) for q in self.queues])
self.__apply_qos(self.consumers)
while 1:
try:
with nested(*self.consumers):
for _ in eventloop(self.connection, timeout=1, ignore_timeouts=True):
pass
except KeyboardInterrupt:
print('bye bye')
except self.connection.connection_errors + self.connection.channel_errors as exc:
print('Connection lost: {0!r}'.format(exc))