How to dynamically add / remove periodic tasks

9,673 views
Skip to first unread message

Jamie Forrest

unread,
Apr 18, 2012, 7:30:44 PM4/18/12
to celery-users
Apologies for the cross-posting to Stack Overflow (http://
stackoverflow.com/questions/10194975/how-to-dynamically-add-remove-
periodic-tasks-to-celery-celerybeat). I haven't had any luck getting a
response over there so I figured I'd try here.

If I have a function defined as follows:

def add(x,y):
return x+y

Is there a way to dynamically add this function as a celery
PeriodicTask and kick it off at runtime? I'd like to be able to do
something like (pseudocode):

some_unique_task_id = celery.beat.schedule_task(add,
run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

I would also want to stop or remove that task dynamically with
something like (pseudocode):

celery.beat.remove_task(some_unique_task_id)
-or-
celery.beat.stop(some_unique_task_id)

FYI I am not using djcelery, which lets you manage periodic tasks via
the django admin.

Thanks,
Jamie Forrest

Ask Solem

unread,
Apr 19, 2012, 9:14:58 AM4/19/12
to celery...@googlegroups.com

No, I'm sorry, this is not possible with the regular celerybeat.

But it's easily extensible to do what you want, e.g. the django-celery
scheduler is just a subclass reading and writing the schedule to the database
(with some optimizations on top).

Also you can use the django-celery scheduler even for non-Django projects.

Something like this:

- Install django + django-celery:

$ pip install -U django django-celery

- Add the following settings to your celeryconfig:

DATABASES = {"default": {"NAME": "celerybeat.db",
"ENGINE": "django.db.backends.sqlite3"}}
INSTALLED_APPS = ("djcelery", )

- Create the database tables:

$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig

- Start celerybeat with the database scheduler:
$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
-S djcelery.schedulers.DatabaseScheduler


Also there's the `djcelerymon` command which can be used for non-Django projects
to start celerycam and a Django Admin webserver in the same process, you can
use that to also edit your periodic tasks in a nice web interface:

$ djcelerymon


(Note for some reason djcelerymon can't be stopped using Ctrl+C, you
have to use Ctrl+Z + kill %1)

--
Ask Solem
twitter.com/asksol | +44 (0)7713357179

signature.asc

Jean-Mark

unread,
Apr 27, 2012, 2:40:43 PM4/27/12
to celery...@googlegroups.com
Is there a way that this can be done using djcelery but not using the Admin interface? I'm able to add tasks successfully but even when I delete them from the database beat still supplies them to the workers. How can I stop that?

Jean-Mark

unread,
Apr 29, 2012, 9:48:01 AM4/29/12
to celery...@googlegroups.com
Just replying since I found a solution for this. When you're deleting a Periodic Task, first set it's enabled property to false FIRST. Then save it then delete it. Otherwise the broker will hold on to it it seems. 

Jean-Mark

unread,
Apr 29, 2012, 10:54:05 AM4/29/12
to celery...@googlegroups.com
Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.

Hope this helps!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):
    
    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """ pauses the task
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()

Johnson Thomas

unread,
Aug 26, 2013, 3:15:33 PM8/26/13
to celery...@googlegroups.com
Hi Jean-Mark,

Thank you for posting the solution to this problem. I was wondering, is there a reason why you defined schedule_every method as a staticmethod especially since you are creating an instance of TaskScheduler within it?.

Thanks,
Johnson

Nilesh Sutar

unread,
Feb 9, 2016, 7:01:34 AM2/9/16
to celery-users
Hi Jean,
Thank you for your approch.
The TaskScheduler Model doesn't support the weekly task and the task interval doesn't supports the other periods other than listed.
Do you have clue to support execution of task weekly, monthly using Django Celery? 

RON MICHAEL

unread,
Mar 10, 2017, 9:44:52 AM3/10/17
to celery-users
Thanks for this! Do you by any chance know how to get the current schedule that's being run by cron? Also, what happens if crons with the same time are created? 
Reply all
Reply to author
Forward
0 new messages