Cleaning up failed RPC queues with TTL or another option?

130 views
Skip to first unread message

Matteius Davis

unread,
Jun 17, 2013, 10:53:28 AM6/17/13
to carrot...@googlegroups.com
Greetings,
 
Per some threads a while back, I have implemented Producer and ProducerRPC classes as well as a Consumer class based on the ConsumerMixin.
 
I have an issue where sometimes the Rabbit server bounces or other issue causing there to be left behind some RPC response queues.   These queues have the auto delete option enabled, however it is possible that the consumer never connects to it, as these queues get left behind and have to be manually deleted.
 
I really need a way to expire these queues after a certain time limit, say 400 seconds.   I have searched and browsed the kombu documentation but I haven't seen how to configure this with a TTL or other option.
 
 
Also are there other options to timeout a Consumer?   It seems like sometimes a Consumer is just waiting and waiting and this takes up our web request workers.  The primary question is about TTL of Response queues, but also if there are some other options I am overlooking to help mitigate errors when working with RPC?
 
 
Thank you,

Ask Solem

unread,
Jun 17, 2013, 11:35:54 AM6/17/13
to carrot...@googlegroups.com
Have you seen the x-expires option to queue_declare? See Queue TTL at the bottom of this page: https://www.rabbitmq.com/ttl.html


--
Ask Solem
twitter.com/asksol

Matteius Davis

unread,
Jun 17, 2013, 1:23:42 PM6/17/13
to carrot...@googlegroups.com
I have heard of it, I know we use it in Celery for some things.
 
How does that get passed in with Kombu?   I am guessing it gets passed in with queue_arguments when creating the Queue()
 
So I will be testing this out, but I suspect this is the relevant addition to pass in the x-expires:
 
        queue_arguments = {"x-expires": self.callback_queue_ttl}
        self.callback_queue = Queue(self.reply_id, self.reply_exchange,
                                    routing_key=self.reply_id, auto_delete=True,
                                    queue_arguments=queue_arguments)

Ask Solem

unread,
Jun 18, 2013, 12:18:10 PM6/18/13
to carrot...@googlegroups.com

On Jun 17, 2013, at 6:23 PM, Matteius Davis <matt...@gmail.com> wrote:

> I have heard of it, I know we use it in Celery for some things.
>
> How does that get passed in with Kombu? I am guessing it gets passed in with queue_arguments when creating the Queue()
>
> So I will be testing this out, but I suspect this is the relevant addition to pass in the x-expires:
>
> queue_arguments = {"x-expires": self.callback_queue_ttl}
> self.callback_queue = Queue(self.reply_id, self.reply_exchange,
> routing_key=self.reply_id, auto_delete=True,
> queue_arguments=queue_arguments)
>
>

Yeah, that example should work!


--
Ask Solem
twitter.com/asksol

Matteius Davis

unread,
Jun 19, 2013, 3:41:08 PM6/19/13
to carrot...@googlegroups.com
Thanks Ask,
 
It does appear to be working for us!




--
Ask Solem
twitter.com/asksol

--
You received this message because you are subscribed to a topic in the Google Groups "carrot-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/carrot-users/SsbL0IwdZcQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to carrot-users...@googlegroups.com.
To post to this group, send email to carrot...@googlegroups.com.
Visit this group at http://groups.google.com/group/carrot-users.
For more options, visit https://groups.google.com/groups/opt_out.



Reply all
Reply to author
Forward
0 new messages