Specifying a Queue pattern for the worker

1,086 views
Skip to first unread message

Keshava Bharadwaj HP

unread,
Feb 23, 2020, 11:43:30 PM2/23/20
to celery-users
Hello,

Is there a way to specify a queue pattern for a single worker to listen to ?

Eg:  we have this command -
celery worker -c 5 -Q document_scan


We need a command to listen to a pattern of Queue - Something like this
celery worker -c 5 -Q document_scan.*


Context:
We have a Queue for each organization - assume - document_scan.1, document_scan.2, document_scan.3 ...... document_scan.1000
We have a single worker and it should listen to all these queues from document_scan.1 upto document_scan.1000 and more which get created dynamically.

Is there a way to configure celery workers like this?


Thanks

Ing. Josue Balandrano Coronel

unread,
Feb 24, 2020, 5:26:00 PM2/24/20
to celery...@googlegroups.com
It's not exactly what you're asking for, the way it works is that a queue will receive tasks sent to `document_scan.*` and then a worker will listen on that queue.
So, in you code you can use `document_scan.1`, `document_scan.2`, ... `document_scan.n` and the tasks will be received by the same queue.

I don't think AMQP supports using a pattern for a worker's queue. the way to solve it is to route tasks from the producer, which is your main application.

Ing. Josue Balandrano Coronel


--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/celery-users/088fa3f8-2705-484c-b35e-7533f9eaeb95%40googlegroups.com.

Keshava Bharadwaj HP

unread,
Feb 24, 2020, 11:02:02 PM2/24/20
to celery-users
Thanks for your response.
Yes, i have taken a look at exchanges.

Our use case is something like this - We have various organizations(Tenants) and they are unbounded(They keep on increasing every day).
So, a message from each organization will come to its own queue(Queue.1 ......Queue.1000 and so on).
We purposefully want this segregation(for quality of service/priority). But now, i need to have my worker listen to all these Queues.
I cannot configure or restart workers each time an organization is on boarded and add it to list of queues.
Hence needed a mechanism where in a worker can listen to a Queue pattern.

Thanks.
To unsubscribe from this group and stop receiving emails from it, send an email to celery...@googlegroups.com.

Ing. Josue Balandrano Coronel

unread,
Feb 25, 2020, 4:37:50 PM2/25/20
to celery...@googlegroups.com
If you need to use separates Queues for quality of service or priority I think it can also work with Exchanges, but that's something very specific to your application.

There are two different options, I think, to achieve what you want to do. Celery, and I believe AMQP itself, do not allow for wildcard when subscribing to queues.
I believe the reason is mainly because it would be an issue to figure out all the queues to pull/push messages from/to on every clock tick.
Any who, here are your two options:

1. Dynamically add workers to queues.
You don't have to re-configure and/or restart workers to tell it to start listening on specific queues. You can use a broadcast command to do this:https://docs.celeryproject.org/en/stable/userguide/workers.html#queues-adding-consumers
You'd have to implement some logic whenever you onboard any new client as well as whenever you restart workers.

2. You can create your own workers.
Celery workers leverage Kombu, which is an abstraction layer on top of AMQP. You can use Kombu's consumer mixin: https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.mixins.html#kombu.mixins.ConsumerMixin to create your custom worker which will periodically check a table (or something like that) for any new queues and add it to the list of queues that the consumer is listening on. Everything else in your setup should be the same. https://docs.celeryproject.org/en/stable/userguide/extending.html

The first option is the path of least resistance. Second option needs you to dive deep into some advanced topics and you might not have time to do this. A 3rd bonus option is possible but it also means diving deep into Celery's internals, you could create/update a blueprint step that modifies how the worker retrieves the list of queues it's going to pull messages from. This would also mean that you'll need to keep a list of possible queues in a table or something like that. https://docs.celeryproject.org/en/stable/userguide/extending.html#installing-bootsteps

Hope this gives you some ideas on how to go forward.

Ing. Josue Balandrano Coronel


To unsubscribe from this group and stop receiving emails from it, send an email to celery-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/celery-users/c6108b17-102a-4d5e-b79d-e2db0960db03%40googlegroups.com.

Keshava Bharadwaj HP

unread,
Feb 25, 2020, 7:25:46 PM2/25/20
to celery-users
Thanks for the response.

Couple of questions on this -
1. Where is this data persisted? Is this persisted in rabbitmq? One need not run this control command everytime worker starts tight?
2. If there are workers already running(20 nodes, concurrency 15 on each node), would running $ celery -A proj control add_consumer foo
take care of notifying all of them and on restarts, will they automatically consume from all the Queues?
The reason is our celery workers are part of AWS ASG and they scale up and down all the time. So when they scale up and down,
Is this configuration of listening to Queues preserved?

Ing. Josue Balandrano Coronel

unread,
Feb 26, 2020, 3:31:39 PM2/26/20
to celery...@googlegroups.com
You can use the command to tell every worker or you can specify the worker's hostname to specify only one worker.
Now, the command is used to add workers to queues dynamically and a worker does not keep state. This means that whenever you restart workers you'll have to add all workers to all your queues.
This might even be better for your setup but it depends. What I would do is create another service (consumer bind service) which will only keep tabs on the queues you have and add workers to those queues. Whenever there's a new customer then a new queue is created and this service adds every worker to that new queue.
Whenever there's a new worker you can have the consumer bind service listen on the worker-online event and add that worker to every queue necessary.

Now, if every worker is always listening on every queue you have then maybe creating a custom bootstep for your workers might make sense, since the same code needs to run for every worker. You can query rabbitMQ or a custom database to get a list of the possible queues and add it to the worker queues.

Ing. Josue Balandrano Coronel


To unsubscribe from this group and stop receiving emails from it, send an email to celery-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/celery-users/5106efa5-88e9-4f1a-b162-48d0a4784c17%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages