Consuming queues created at runtime on restart

181 views
Skip to first unread message

Rafael Novello

unread,
Dec 16, 2013, 9:39:17 PM12/16/13
to celery...@googlegroups.com
Hi!

I'm doing some tests and I got success on creating and start consuming queues on the fly. But I have a problem, if the worker that consumes these queues stop on the middle of process and restart, it loses the configuration which queues should consume.

Then, I would like to know, is there some configuration that makes the worker restart consuming queues that were created at runtime? Should I use exchange for it? And how can I delete queues when it is empty?

Thanks for the help!!
Rafael Novello

Rafael Novello

unread,
Dec 18, 2013, 8:35:24 AM12/18/13
to celery...@googlegroups.com
Hi guys!

Any idea about my question? 

I believe I can't delete empty queues with celery, maybe I'll need to use pika for that, right?

And the other problem is, consuming dynamic queues when the worker start. For example:

1) I have a task that creates N queues and put messages in then. These queues can use a prefix or any other pattern;
2) The same task add consumers on each queue;
3) The workers start consuming each queue, but for some reason these workers stop before emptying queues;
4) When I restart the workers I lost the configuration of consumers

The question is, there is some way to configure celery workers to consume these queues created at runtime?

Thanks for the help!
Rafael Novello

Ask Solem

unread,
Dec 18, 2013, 9:51:28 AM12/18/13
to celery...@googlegroups.com

On Dec 18, 2013, at 1:35 PM, Rafael Novello <rafa.rei...@gmail.com> wrote:

> Hi guys!
>
> Any idea about my question?
>
> I believe I can't delete empty queues with celery, maybe I'll need to use pika for that, right?


if the queue is configured in CELERY_QUEUES (or uses automatic routing):

with app.pool.acquire() as conn:
app.amqp.queues[name](conn).delete(if_empty=True)


or low-level:

with app.pool.acquire() as conn:
conn.default_channel.queue_delete(name, if_empty=True)


There’s also the auto_delete option that means the broker will delete
the queue when it’s unused and empty:

from kombu import Exchange, Queue
Queue(‘foo’, Exchange(‘foo’), routing_key=‘foo’, auto_delete=True)


>
> And the other problem is, consuming dynamic queues when the worker start. For example:
>
> 1) I have a task that creates N queues and put messages in then. These queues can use a prefix or any other pattern;
> 2) The same task add consumers on each queue;
> 3) The workers start consuming each queue, but for some reason these workers stop before emptying queues;
> 4) When I restart the workers I lost the configuration of consumers
>
> The question is, there is some way to configure celery workers to consume these queues created at runtime?
>

No, you would have to add that capability to the worker by using the signals and storing the
configuration somewhere, and load them at startup.
E.g. in the statedb (which is optional enabled by —statedb=worker.db)


> Thanks for the help!
> Rafael Novello
>
>
> Em terça-feira, 17 de dezembro de 2013 00h39min17s UTC-2, Rafael Novello escreveu:
> Hi!
>
> I'm doing some tests and I got success on creating and start consuming queues on the fly. But I have a problem, if the worker that consumes these queues stop on the middle of process and restart, it loses the configuration which queues should consume.
>
> Then, I would like to know, is there some configuration that makes the worker restart consuming queues that were created at runtime? Should I use exchange for it? And how can I delete queues when it is empty?
>
> Thanks for the help!!
> Rafael Novello
>
> --
> You received this message because you are subscribed to the Google Groups "celery-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to celery-users...@googlegroups.com.
> To post to this group, send email to celery...@googlegroups.com.
> Visit this group at http://groups.google.com/group/celery-users.
> For more options, visit https://groups.google.com/groups/opt_out.



--
Ask Solem
twitter.com/asksol | +44 07454281208

Rafael Novello

unread,
Dec 18, 2013, 11:21:02 AM12/18/13
to celery...@googlegroups.com
Solem, thanks for the help!
Reply all
Reply to author
Forward
0 new messages