How to set x-consumer-timeout value?

499 views
Skip to first unread message

Imanuelis Nekantas

unread,
Sep 4, 2024, 4:53:00 PM9/4/24
to rabbitmq-users
The problem: RabbitMQ enforces a delivery acknowledgment timeout. If the message isn't acknowledged within the default 30-minute window, the channel closes and the message gets requeued.

What I need is to keep the consumer alive and prevent the channel from closing during long-running message processing, while ensuring that the message gets acknowledged after processing is complete.

I'm trying to set a consumer timeout when creating a queue in RabbitMQ using the Kombu library, but it seems like no matter what I pass as an argument, the queue is created with the default value of 1800000, and the x-consumer-timeout doesn't take effect.

Here is the relevant part of my code:

 Queue(**consumer_settings,
queue_arguments={'x-consumer-timeout'3600000})
 ], on_message=self.on_message(self.process_name), )]

I'm trying to set the x-consumer-timeout argument, but when I check the RabbitMQ management UI, the queue is created without applying the timeout value.

RabbitMQ Version: I'm running RabbitMQ 3.12.2, which supports x-consumer-timeout. Kombu Version: I'm using Kombu 5.4.0.

What am I missing? Why isn't the x-consumer-timeout being applied when the queue is created?

Thanks for the help :)

Hank Birkdale

unread,
Sep 4, 2024, 5:13:42 PM9/4/24
to rabbitmq-users
In my celeryconfig file I have a task_queues entry and it looks like this (its under Message Routing, now when you create the queues )

task_queues = (
Queue(cu.BLOCKCHAIN_Q, Exchange(cu.BLOCKCHAIN_Q), routing_key=cu.BLOCKCHAIN_Q),
Queue(cu.DB_Q, Exchange(cu.DB_Q), routing_key=cu.DB_Q),
Queue(cu.DEFAULT_Q, Exchange(cu.DEFAULT_Q), routing_key=cu.DEFAULT_Q,queue_arguments={'x-consumer-timeout':43200000}),
)
It seems to be working fine when I connect to AWS Rabbit MQ  - I can see the x-consumer-timeout value and its what I expect. 
Reply all
Reply to author
Forward
0 new messages