RabbitMQ resubmits tasks that are still processed

49 views
Skip to first unread message

Mohammed Mahdi Akkouh

unread,
Oct 18, 2019, 1:17:19 AM10/18/19
to rabbitmq-users
Hello,
I have very long tasks that are processed by the workers, and the manual acknowledgment is sent at the end of each task. While those tasks are being processed, if I start a new worker, this latter receives one of those tasks that are still under process rather than the next one in the queue.
I couldn't find any explanation on this behavior, I can just assume there is some sort of timeout for acks and tasks are resubmitted once this timeout is reached.
I can send a positive ack once I receive the task as well but this defeats the whole purpose of manual acking. Any thoughts?
Regards,

rajesh sethi

unread,
Oct 18, 2019, 2:25:08 AM10/18/19
to rabbitmq-users
What is the prefetch count for the workers ?

Mohammed Mahdi Akkouh

unread,
Oct 18, 2019, 3:34:03 AM10/18/19
to rabbitm...@googlegroups.com
It is 1.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/ea8e44ac-f01d-4958-a24e-6f493305930e%40googlegroups.com.

Luke Bakken

unread,
Oct 18, 2019, 1:28:05 PM10/18/19
to rabbitmq-users
Hi Mohammed,

RabbitMQ won't deliver a message if it is waiting for an acknowledgement, nor is there a timeout where messages are re-enqueued. These messages will be in the "Unacked" state in the management interface. If your client closes the connection, those messages will be re-enqueued. A closed connection will be logged.

There is a bug in your code, or perhaps duplicate messages are being enqueued. Without more information we'd just be guessing.

Thanks,
Luke

Mohammed Mahdi Akkouh

unread,
Oct 19, 2019, 1:28:48 AM10/19/19
to rabbitmq-users
Hi Luke,
My code is really simple:

send.py
    def publish(self, data, close_connection=True, head_data=False):
        """
        Send tasks to workers
        :param data: list
        :param close_connection: boolean
        :return: None
        """
        for message in data:
            self.channel.basic_publish(
                exchange='',
                routing_key=self.queue,
                body=message,
                properties=pika.BasicProperties(
                    delivery_mode=2
                ))
            if head_data:
                logger.info(" [x] Sent %r" % message[0:env.HEAD_DATA_BITS])
            else:
                logger.info(" [x] Sent %r" % message)
        if close_connection:
            self.close()

receive.py
class SimulationWorker(Worker):
    """
    Class for distributing tasks among simulation workers
    """

    def __init__(self):
        super().__init__(env.RABBITMQ_SIMULATOR_QUEUE_NAME)

    def consume(self):
        """
        Send tasks to workers
        :param data: list
        :return: None
        """
        logger.info(' [*] Waiting for messages. To exit press CTRL+C')
        self.channel.basic_qos(prefetch_count=1) # send task to available worker
        self.channel.basic_consume(queue=self.queue, on_message_callback=self.simulate)
        self.channel.start_consuming()

    def simulate(self, ch, method, properties, body):
        """
        Callback function called for each task
        :param ch:
        :param method:
        :param properties:
        :param body:
        :return: None
        """
        logger.info(" [*] Running simulation %r" % body)
        s = Simulator()
        data = json.loads(body)
        cycle = data["cycle"]
        phase = data["phase"]
        s.simulate(cycle, phase) # very expensive function call
        logger.info(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)

Connection is never closed by the consumer, tasks are all different. Basically each task message is really simple, a dictionary of (cycle, phase) which are integers and I can see through the logs that they are all different. What happens sometimes is that I might kill some or all workers to fix the code and rerun them. Messages are persistent in RabbitMQ.
Maybe if I can monitor the tasks in the queue, that might help.

Luke Bakken

unread,
Oct 19, 2019, 4:30:06 PM10/19/19
to rabbitmq-users
Connection is never closed by the consumer, tasks are all different. Basically each task message is really simple, a dictionary of (cycle, phase) which are integers and I can see through the logs that they are all different. What happens sometimes is that I might kill some or all workers to fix the code and rerun them. Messages are persistent in RabbitMQ.
Maybe if I can monitor the tasks in the queue, that might help.

When you kill workers, their connections are closed and any un-acked message associated with the connection will be re-enqueued as I explained below. You can read about the "redelivered" flag in the RabbitMQ docs. This blog post has some good information: https://blog.forma-pro.com/rabbitmq-redelivery-pitfalls-440e0347f4e0

Just FYI, if your simulate method takes a long time, you should run it on a separate thread and use this method to ack the message:


Otherwise, simulate will block Pika's I/O loop and prevent it from sending heartbeat messages.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages