Run a certain task serially, everything else in parallel. Possible?

3,787 views
Skip to first unread message

Salvatore Iovene

unread,
Nov 1, 2011, 7:49:07 AM11/1/11
to celery...@googlegroups.com
Hello,
I have a django app that has several celery tasks. One of these tasks, I'd like to always run serially (one at a time) so I never have two if the same running at the same time.

I could use only one worker and set CELERYD_CONCURRENCY=1, but that would make all the tasks serial.

Is there a way to do what I want (assuming I explained it well?)

Thanks!
  Salvatore Iovene.

Alexander Koval

unread,
Nov 1, 2011, 9:11:50 AM11/1/11
to celery-users
You can manually manage locks:
http://ask.github.com/celery/cookbook/tasks.html#ensuring-a-task-is-only-executed-one-at-a-time

Or create 2 queues (start one with concurency 1) and configure routes
to send that task to needed queue.

On 1 ноя, 13:49, Salvatore Iovene <salvatore.iov...@gmail.com> wrote:
> Hello,
> I have a django app that has several celery tasks. One of these tasks, I'd
> like to always run serially (one at a time) so I never have two if the same
> running at the same time.
>
> I could use only one worker and set CELERYD_CONCURRENCY=1, but that would
> make *all* the tasks serial.

Salvatore Iovene

unread,
Nov 2, 2011, 4:24:10 AM11/2/11
to celery...@googlegroups.com
On Tuesday, November 1, 2011 3:11:50 PM UTC+2, Alexander Koval wrote:
You can manually manage locks:
http://ask.github.com/celery/cookbook/tasks.html#ensuring-a-task-is-only-executed-one-at-a-time
 
Thanks, that did it for me.

  Salvatore. 

Salvatore Iovene

unread,
Nov 2, 2011, 12:11:31 PM11/2/11
to celery...@googlegroups.com
Hi again,


On Tuesday, November 1, 2011 3:11:50 PM UTC+2, Alexander Koval wrote:
I have found that this doesn't really work for me. The reason is that I have some really fast tasks and one task that is always very time consuming. I have modified the code you linked so that there is only one lock, and when a task is started, it will sleep() until is able to acquire the lock.

I was satisfied with that until I realized that I can have the following situation:

Worker 1: really time consuming task
Worker 2: really time consuming task, sleeping waiting to acquire lock
Worker 3: really time consuming task, sleeping waiting to acquire lock
Worker 4: really time consuming task, sleeping waiting to acquire lock
 
From the point of view of celery, those four tasks are running, because celery doesn't care that 2, 3 and 4 are repeatedly calling time.sleep(5) until the lock is free.
But the problem with this solution is that all my fast running tasks are starved.

So I need something else.

Or create 2 queues (start one with concurency 1) and configure routes
to send that task to needed queue.

I have tried this, but I have some trouble with the configuration. I have added the following to my django settings:

CELERY_QUEUES = {"default" : {"exchange":"default", "binding_key":"default"},                                   
                 "plate_solve": {"exchange":"plate_solve", "binding_key":"plate_solve"}                         
                }                                                                                               
CELERY_DEFAULT_QUEUE = "default"                                                                                 
CELERY_ROUTES = {"astrobin.tasks.solve_image" : {"queue":"plate_solve", "routing_key":"solve_image"}}  

The two queues are created, as I can read from the log, but the task astrobin.tasks.solve_image, destined to the plate_solve queue, is never executed.

I have tried changing the "routing_key" to "plate_solve" (same value as the binding_key) and all the tasks worked that way, but I was back at the problem above: the fast tasks were being starved by the slow ones.

Can you tell me where my config is wrong, and what else I'm doing wrong, please?

Thanks!
  Salvatore. 
 

Salvatore Iovene

unread,
Nov 2, 2011, 2:13:02 PM11/2/11
to celery...@googlegroups.com
On Wednesday, November 2, 2011 6:11:31 PM UTC+2, Salvatore Iovene wrote:
CELERY_QUEUES = {"default" : {"exchange":"default", "binding_key":"default"},                                   
                 "plate_solve": {"exchange":"plate_solve", "binding_key":"plate_solve"}                         
                }                                                                                               
CELERY_DEFAULT_QUEUE = "default"                                                                                 
CELERY_ROUTES = {"astrobin.tasks.solve_image" : {"queue":"plate_solve", "routing_key":"solve_image"}}  

The two queues are created, as I can read from the log, but the task astrobin.tasks.solve_image, destined to the plate_solve queue, is never executed.

I have tried changing the "routing_key" to "plate_solve" (same value as the binding_key) and all the tasks worked that way, but I was back at the problem above: the fast tasks were being starved by the slow ones.
 
 
I've solved my problem. The binding_key and the routing_key needed to be the same, alright, but I also needed to start two different instances of celeryd, each with a different queue assigned to.

Thanks!

Alexander Koval

unread,
Nov 4, 2011, 7:44:31 AM11/4/11
to celery...@googlegroups.com
You can define queue directly in the task decorator:

@task(queue="plate_solve")

or

@task(exchange="plate_solve", routing_key="plate_solve")


Also you can create special queue for all tasks that should be run one at time:
CELERY_QUEUES = {"default" : {"exchange":"default", "binding_key":"default"},
                                   
                 "single": {"exchange":"single", "exchange_type":"topic", "binding_key":"single.#"}                         
                }

@task(
exchange=="single", routing_key="single.solve_image")
# or @task(queue="single")
def solve_image():
pass

@task(exchange=="single", routing_key="single.another_single_task")
# or @task(queue="single")
def another_single_task():
pass


2011/11/2 Salvatore Iovene <salvator...@gmail.com>

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To view this discussion on the web visit https://groups.google.com/d/msg/celery-users/-/-zeIc7QTGo8J.

To post to this group, send email to celery...@googlegroups.com.
To unsubscribe from this group, send email to celery-users...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/celery-users?hl=en.

Owais Lone

unread,
Dec 17, 2011, 12:43:19 PM12/17/11
to celery...@googlegroups.com
I've a similar problem but in my case I've to decide dynamically if the task should wait or not based on the value of arguments.

So even 4 instances of the same task can exist but only one can exist with the same argument.

I need something like this,

     def task(arg):
         if lock:
             task.requeue(arg)

One ugly hack would be to use retry() but I don't want to steal retry attempts from the task

Kirill Panshin

unread,
Dec 22, 2011, 7:38:20 AM12/22/11
to celery-users
Owais, I'd solve your problem writing custom Router and using 2 queues
(one with single worker, and other with many).
In my project I have 4 queues for different types of tasks.

Also good solution might be if task is fired by user in UI, you might
use `pending` attribute to the project task is related to If project
is pending, disable any actions. Then ideally you're done (of course
assuming that tasks are fired only by UI).
Reply all
Reply to author
Forward
0 new messages