Priority is an AMQP feature where 0 is the highest priority.
Sadly, priorities has not been implemented in RabbitMQ yet.
See discussion here:
http://www.trapexit.org/forum/viewtopic.php?p=48220
They say it might be better to create different queues e.g. one high
priority queue
and one low priority queue, then let e.g 2 machines take the high
priority queue, and 1 machine
the lower priority queue. This is what we do at Opera.
Also there is a rate limiting feature coming in 1.0 (unittests passes
already, and the current code can be found
in the tokenbucket branch: http://github.com/ask/celery/tree/tokenbucket/
)
With rate limits you can rate limit the lower priority tasks, thus
getting a better throughput of important tasks.
>
> Thanks for the response.
>
> If I only have one machine, is there any way to configure celery to
> assign certain workers to perform only specific tasks or process
> specific queues?
>
>
> Could you briefly explain how rate liiting works, or point me to some
> documentation?
Only documentation so far is the code, as it's an experimental feature.
You set a rate limit for a task type with the Task.rate_limit attribute.
class MyTask(Task):
rate_limit = "10/s"
will have a rate limit of 10 tasks a second, while taking bursts of
activity in account,
so it doesn't mean it takes 1 second to process 10 tasks.
> Will rate limiting work if I only have one worker?
Indeed!
> My
> impression is that each worker takes one task from the queue, and
> stays with it until it is completed, before taking the next one from
> the queue.
That's not right. This would normally be the case with a polling
architecture. With AMQP QoS it tries to prefetch as many messages as
there are
pool workers (the concurrency setting). Think about the countdown/ETA
options, which let you specify a date and time in the future when the
task should be executed, these ETA messages will stay in the workers
memory until the ETA is met. Due to AMQP's acknowledgement workflow it
doesn't matter if the worker crashes with these in memory, the broker
will resend the messages in that case.
There are two internal queues in celery, by internal I mean it's a
python Queue.Queue object.
* bucket_queue
This is the rate limited Queue, tasks that should be
executed immediately are put on this queue.
* hold_queue
Tasks that are on paused (because it has an ETA) is put
into this queue.
There are three threads:
* Mediator
Moves tasks in the hold_queue to the worker pool for
execution.
* PeriodicTaskController
Wakes up every second to schedule periodic tasks and moves
tasks that are ready from the hold_queue
onto the bucket_queue.
* AMQPListener
Consumes messages from AMQP and puts them into the
hold_queue.
Maybe TMI (good to have it in writing), but as you can see there can
be lots of messages flowing through
these components, and it's not scary all thanks to AMQP's reliable
messaging.