Best practices for long running processes

468 views
Skip to first unread message

Félix López

unread,
Feb 24, 2014, 2:47:28 AM2/24/14
to carrot...@googlegroups.com
Hi everyone,

I'm using kombu with rabbitmq to process some tasks that can take a while. I send the ack after the tasks it's done,  if the connection is closed or whatever, for example IOError: Socket closed, I call ensure_connection to establish it again. The problem with this is that the message is processed twice, because once the connection is closed, the message is requeue automatically. 
I can fix easily sending the ack as the message arrive, but what would happen if the process dies? The task would be lost. I was wondering what people normally do for this problems.

Note that I receive every now and then an "IOError: Socket closed" even though I'm using heartbeats. This my code if it helps to understand my problem:

class Consumer(object):
    def __init__(self, amqp_url, queues):
        self.queues = queues
        self.amqp_url = amqp_url
        self.consumers = []
    def __apply_qos(self, consumers):
        for c in consumers:
            c.qos(prefetch_count=1)

    def monitor_heartbeats(self, connection):
        """Function to send heartbeat checks to RabbitMQ. This keeps the
        connection alive over long-running processes."""
        if not connection.heartbeat:
            logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
            return
        interval = connection.heartbeat
        cref = weakref.ref(connection)
        logger.info("Starting heartbeat monitor.")

        def heartbeat_check():
            conn = cref()
            if conn is not None and conn.connected:
                conn.heartbeat_check(rate=interval)
                logger.info("Ran heartbeat check.")
                spawn_after(interval/2, heartbeat_check)
        return spawn_after(interval/2, heartbeat_check)

    def run(self):
        self.connection = Connection(self.amqp_url, heartbeat=100)
        self.connection.ensure_connection()
        self.monitor_heartbeats(self.connection)
        logger.info("Starting worker.")
        self.consumers.extend([self.connection.Consumer(q, callbacks=[self.on_message]) for q in self.queues])
        self.__apply_qos(self.consumers)
        while 1:
            try:
                with nested(*self.consumers):
                    for _ in eventloop(self.connection, timeout=1, ignore_timeouts=True):
                        pass
            except KeyboardInterrupt:
                print('bye bye')
            except self.connection.connection_errors + self.connection.channel_errors as exc:
                print('Connection lost: {0!r}'.format(exc))

Thanks!

Ask Solem

unread,
Feb 25, 2014, 10:49:47 AM2/25/14
to carrot...@googlegroups.com

On Feb 24, 2014, at 7:47 AM, Félix López <jaaael...@gmail.com> wrote:

> Hi everyone,
>
> I'm using kombu with rabbitmq to process some tasks that can take a while. I send the ack after the tasks it's done, if the connection is closed or whatever, for example IOError: Socket closed, I call ensure_connection to establish it again. The problem with this is that the message is processed twice, because once the connection is closed, the message is requeue automatically.
> I can fix easily sending the ack as the message arrive, but what would happen if the process dies? The task would be lost. I was wondering what people normally do for this problems.
>
> Note that I receive every now and then an "IOError: Socket closed" even though I'm using heartbeats. This my code if it helps to understand my problem:
>
>

Exactly-once semantics is not feasible so if you use “late ack” (as Celery calls it) you must
make sure your task is idempotent or use deduplication to filter out messages
you have already seen, but means having a centralized list of message ids.

The best solution is to have idempotent tasks and protect side effects at the application layer.
E.g. if the task is performing a money transaction in a database
then the task must be responsible for making sure the transaction did not already happen,
not the messaging service.

Somewhat relevant information here:
http://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late



Félix López

unread,
Feb 25, 2014, 10:59:54 AM2/25/14
to carrot...@googlegroups.com
thanks ;)
Reply all
Reply to author
Forward
0 new messages