how to requeue unacked messages

333 views
Skip to first unread message

Dhruv Gupta

unread,
Mar 22, 2024, 9:34:07 AM3/22/24
to rabbitmq-users
Hello,

I am trying to publish messages to a task-queue from where multiple consumers connect and fetch the data according to the round robin policy.

I am having an issue with my consumer that while processing a message it sometimes die or disconnects abruptly. This makes the messages from the consumer to be unacked in the queue.

I want to requeue the unacked messages as soon as the consumer dies or disconnects. How can I achieve that ?

Thanks,
Dhruv

Luke Bakken

unread,
Mar 22, 2024, 2:45:06 PM3/22/24
to rabbitmq-users
Hi Dhruv,

If your consumer dies or disconnects, RabbitMQ will re-enqueue the message in the "Ready" state. This is RabbitMQ's behavior.

If you're not seeing that, you should double-check that the consumer really is disconnecting like you think.

If you can provide a reliable way to demonstrate what you're seeing, we will take a look.

Thanks,
Luke

Dhruv Gupta

unread,
Mar 25, 2024, 5:58:53 AM3/25/24
to rabbitmq-users
Hello Luke,

Thank you for your message. I will try to explain my problem in a much more clear way. So the consumer code is below:

import pika
import numpy as np
import time
import json
import base64
import threading

def on_reply(ch, method, properties, body):

    print(body)

host = 'localhost'
# rabbitmq for task queueing
parameters = pika.ConnectionParameters(host=host, port=5672, heartbeat=0)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

print('Connection Successful')
# create/check the task queue
task_queue = channel.queue_declare(queue="task-queue", arguments={'x-consumer-timeout' : 60000})

channel.basic_qos(prefetch_count=1)
# auto_ack is handled manually
channel.basic_consume(queue='task-queue',
                      auto_ack=False,
                      on_message_callback=on_reply
                      )
channel.start_consuming()


Also, find the screenshot attached. If I abrupt the above python script when it is consuming, the consumer still remains in the task-queue and as soon as I run the script again a second instance of the consumer is created.

Also, I changed the consumer timeout to 1 Minute, even then after 1 minute the consumer which had died, doesn't get removed automatically, is my understanding of consumer timeout correct ?

Related to the requeuing of the messages, the messages which are unacked doesnt get requeue even for the new consumer instance that is created.

I hope the explanation helps you in finding out what is going wrong.

Thanks,
Dhruv
Screenshot 2024-03-25 at 10.51.19.png

Luke Bakken

unread,
Mar 26, 2024, 10:05:03 AM3/26/24
to rabbitmq-users
Hi again,

How are you stopping your python script? Are you sure the python process is actually stopped? Have you checked the process list? (ps -ef | grep -F python)?

Dhruv Gupta

unread,
Mar 27, 2024, 5:02:27 AM3/27/24
to rabbitmq-users
Hi Luke,

I am using Jupyter Notebook to start the kernel. I am stopping the kernel abruptly by shutting it down completely to test some error handling scenarios.

Thanks,
Dhruv

Luke Bakken

unread,
Mar 27, 2024, 10:20:06 AM3/27/24
to rabbitmq-users
Hello,

I can't explain what is going on in this case. If you can provide a way for me to reproduce it, I would appreciate it.

Reply all
Reply to author
Forward
0 new messages