Best practice using scheduler as a task queue?

2,713 views
Skip to first unread message

Michael Toomim

unread,
Jun 13, 2012, 1:24:15 AM6/13/12
to web...@googlegroups.com
Here's a common scenario. I'm looking for the best implementation using the scheduler.

I want to support a set of background tasks (task1, task2...), where each task:
  • processes a queue of items
  • waits a few seconds

It's safe to have task1 and task2 running in parallel, but I cannot have two task1s running in parallel. They will duplicately process the same queue of items.

I found the scheduler supports this nicely with parameters like:

db.scheduler_task.insert(function_name='task1',
                         task_name='task1',
                         stop_time = now + timedelta(days=90000),
                         repeats=0,
                         period=10)

I can launch 3 workers, and they coordinate amongst themselves to make sure that only one will run the task at a time. Great! This task will last forever...

...but now we encounter my problem...

What happens if it crashes, or passes stop_time? Then the task will turn off, and the queue is no longer processed. Or what happens if I reset the database, or install this code on a new server? It isn't nice if I have to re-run the insert function by hand.

So how can I ensure there is always EXACTLY ONE of each task in the database?

I tried putting this code into models:

def initialize_task_queue(task_name):
    num_tasks = db((db.scheduler_task.function_name == task_name)
                   & ((db.scheduler_task.status == 'QUEUED')
                      | (db.scheduler_task.status == 'ASSIGNED')
                      | (db.scheduler_task.status == 'RUNNING')
                      | (db.scheduler_task.status == 'ACTIVE'))).count()

    # Add a task if there isn't one already
    if num_tasks < 1:
        db.scheduler_task.insert(function_name=task_name,
                                 task_name=task_name,
                                 stop_time = now + timedelta(days=90000),
                                 repeats=0,
                                 period=period)
        db.commit()

initialize_task_queue('task1')
initialize_task_queue('task2')
initialize_task_queue('task3')

This worked, except it introduces a race condition! If you start three web2py processes simultaneously (e.g., for three scheduler processes), they will insert duplicate tasks:
    process 1: count number of 'task1' tasks
    process 2: count number of 'task1' tasks
    process 1: there are less than 1, insert a 'task1' task
    process 2: there are less than 1, insert a 'task1' task

I was counting on postgresql's MVCC transaction support to make each of these atomic. Unfortunately, that's not how it works. I do not understand why. As a workaround, I'm currently wrapping the code inside "initialize_task_queue" with postgresql advisory lock:

    if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
        return

    ... count tasks, add one if needed ...

    db.executesql('select pg_advisory_unlock(1);')

But this sucks.
What's a better way to ensure there is always 1 infinite-repeat task in the scheduler? Or... am I using the wrong design entirely?

Niphlod

unread,
Jun 13, 2012, 10:16:56 AM6/13/12
to web...@googlegroups.com
Maybe I didn't get exactly what you need , but ...... you have 3 tasks, that needs to be unique.
Also, you want to be sure that if a task crashes doesn't remain "hanged".

This should never happen with the scheduler .... the worst situation is that if a worker crashes (here "crashes" is it disconnects from the database) leaves the task status as running, but as soon as another scheduler checks if that one sends heartbeats, he removes the dead worker and requeue that task.
If your task goes into timeout and it's a repeating task the best practice should be to raise the timeout.

Assured this, you need to initialize the database if someone truncates the scheduler_task table, inserting the 3 records in one transaction.

If you need to be sure, why all the hassle when you can "prepare" the task_name column as a unique value and then do db.update_or_insert(task_name==myuniquetaskname, **task_record) ?

PS: code in models get executed every request. What if you have no users accessing the site and in the need to call initialize_task_queue ? Isn't it better to insert the values and then start the workers ?

BTW: a task that needs to be running "forever" but can't be "launched" in two instances seems to suffer some design issues but hey, everyone needs to be able to do what he wants ;-)

Michael Toomim

unread,
Jun 17, 2012, 7:20:12 PM6/17/12
to web...@googlegroups.com
Thanks for the response, niphlod! Let me explain:

The task can be marked FAILED or EXPIRED if:
  • The code in the task throws an exception
  • A run of the task exceeds the timeout
  • The system clock goes past stop_time

And it will just not plain exist if:
  • You have just set up the code
  • You install the code on a new database

In order to implement a perpetually processing task queue, we need to ensure that there is always an active "process queue" task ready in the scheduler. So what's the best way to do this without creating race conditions?

This method creates a race condition:
   1. Check if task exists
   2. Insert if not
...if multiple processes are checking and inserting at the same time. The only solution I've found is to wrap that code within a postgresql database advisory lock, but this only works on postgres. (And the update_or_insert() function you mentioned does the same thing as this internally, so it still suffers from a race condition.)

Michael Toomim

unread,
Jun 17, 2012, 7:29:56 PM6/17/12
to web...@googlegroups.com
To respond to your last two points:

You're right that models only runs on every request... I figured if my website isn't getting any usage then the tasks don't matter anyway. :P
 
Yes, I think there are design issues here, but I haven't found a better solution. I'm very interested in hearing better overall solutions! The obvious alternative is to write a standalone script that loops forever, and launch it separately using something like "python web2py.py -S app/controller -M -N -R background_work.py -A foo". But this requires solving the following problems that are already solved by the scheduler:
  • During development, restarting & reloading models as I change code
  • Killing these background processes when I quit the server
  • Ensuring that no more than one background process runs at a time

On Wednesday, June 13, 2012 7:16:56 AM UTC-7, Niphlod wrote:

Niphlod

unread,
Jun 18, 2012, 4:22:02 AM6/18/12
to web...@googlegroups.com
I'll get back to you this evening, but for now, maybe it's a fix.....

You're afraid that starting n web2py processes your code in "initialize" will duplicate inserts. That is the common behaviour in db transactions..... if you're running three processes, and you're not lucky, all three will "see" a clean slate and will try to insert data. 

But, what if you schedule the "initialize_task_queue" as a repeating task ?

Then you'll be sure that it gets processed not concurrently, and that function will be the one in charge of reinitializing the task queue.
I use a similar method in w2p_tvseries to enable group of tasks that are yet scheduled: not quite the same thing but it should work nonetheless.

Michael Toomim

unread,
Jun 25, 2012, 7:54:30 PM6/25/12
to web...@googlegroups.com
This scenario is working out worse and worse.

Now I'm getting tasks stuck in the 'RUNNING' state... even when there aren't any scheduler processes running behind them running! I'm guessing the server got killed mid-process, and now it doesn't know how to recover. Looks like a bug in the scheduler.

I don't recommend using the scheduler as a task queue to anybody.

Michael Toomim

unread,
Jun 25, 2012, 7:57:25 PM6/25/12
to web...@googlegroups.com
Er, let me rephrase: I don't recommend using the scheduler for infinitely looping background tasks.

pbreit

unread,
Jun 25, 2012, 9:23:58 PM6/25/12
to web...@googlegroups.com
I see Rails is adding some sort of queue:

Niphlod

unread,
Jun 26, 2012, 3:48:24 AM6/26/12
to web...@googlegroups.com
well, after all the time you spent after it, it's quite a bold statement.
Current scheduler (in trunk there has been a problem, attached the correct one) reassign "RUNNING" tasks if a worker is killed in action, but you may want to check if in scheduler_worker table you have entries for dead workers too (there should be only active schedulers on it).

PS: bugs get eventually solved, if you can point to some of them :D
PS 2 : I use scheduler in production both for "one-time-only" tasks and for recurring ones, and never incurred in such problems.
scheduler.py

Andrew

unread,
Jun 26, 2012, 2:10:46 PM6/26/12
to web...@googlegroups.com
Michael, I intend to use the scheduler for infinite looping background tasks. From what I have read in this thread, I see the issue is that you are most concerned about race conditions.

I am yet to do this with web2py scheduler, but why not have something in cron at reboot to insert and perhaps a regular task to check, if you like. We implement mutex functions for this and they can be as fancy as you like. http://en.wikipedia.org/wiki/Mutual_exclusion
Either enforce uniqueness at the database, or have something in your mutex function that will only allow one process at a time to attempt an insert. A lock token file perhaps.
Web2py should enforce uniqueness, but you can also implement you own mutex function.

As for the bugs, these aren't design flaws, they just need to get fixed.

I have not given up on the scheduler.

Alec Taylor

unread,
Jun 26, 2012, 2:26:04 PM6/26/12
to web...@googlegroups.com
Wouldn't lock-free mechanisms be more useful?
> --
>
>
>

Niphlod

unread,
Jun 26, 2012, 3:13:32 PM6/26/12
to web...@googlegroups.com
problem here started as "I can't ensure my app to insert only one task per function", that is not a scheduler problem "per se": it's a common database problem. Would have been the same if someone created a
db.define_table('mytable',
     Field('name'),
     Field('uniquecostraint')
)
and have to ensure, without specifying Field('uniquecostraint', unique=True) that there are no records with the same value into the column uniquecostraint.

From there to "now I have tasks stuck in RUNNING status, please avoid using the scheduler" without any further details, the leap is quite "undocumented".

And please do note that scheduler in trunk has gone under some changes: there was a point in time where abnormally killed schedulers (as kill -SIGKILL the process) left tasks in RUNNING status, that would not be picked up by subsequent scheduler processes.

That was a design issue: if a task is RUNNING and you kill scheduler while the task was processed, you had no absolutely way to tell what the function did (say, send a batch of 500 emails) before it was actually killed.
If the task was not planned properly it could send e.g. 359 mails, be killed, and if it was picked up again by another scheduler after the "first killed round" 359 of your recipients would get 2 identical mails.
It has been decided to requeue RUNNING tasks without any active worker doing that (i.e. leave to the function the eventual check of what has been done), so now RUNNING tasks with a dead worker assigned get requeued.

With other changes (soon in trunk, the previously attached file) you're able to stop workers, so they may be killed "ungracefully" being sure that they're not processing tasks.

If you need more details, as always, I'm happy to help, and other developers too, I'm sure :D

Michael Toomim

unread,
Jun 26, 2012, 10:57:25 PM6/26/12
to web...@googlegroups.com
All, thank you for the excellent discussion!

I should explain why I posted that recommendation. The "vision" of using the scheduler for background tasks was:

"Woohoo, this scheduler will automatically handle locks—so I don't need to worry about stray background processes running in parallel automatically, and it will automatically start/stop the processes with the web2py server with -K, which makes it much easier to deploy the code!"
 
It turned out:
  • Setting up scheduler tasks was complicated in itself.
     • 3 static tasks had to be inserted into every new db.
       This requires new installations of my software to run a setup routine. Yuck.
     • When I made that automatic in models/, it required locks to avoid db race condition.
       (I used postgresql advisory locks. Not cross-platform, but I dunno a better solution.)
     • The goal was to avoid locks in the first place!
  • When things go wrong, it's harder to debug.
    • The scheduler adds a new layer of complexity.
    • Because now I have to make sure my tasks are there properly.
    • And then look for the scheduler_run instances to see how they went.

I must admit that this second problem would probably go away if we fixed all the scheduler's bugs! But it still leaves me uneasy. And I don't like having 40,000 scheduler_run instances build up over time.

At this point, I realized that what I really want is a new feature in web2py that:
  • Runs a function in models (akin to scheduler's executor function) in a subprocess repeatedly
  • Ensures, with locks etc., that:
     • Only one is running at a time
     • That it dies if the parent web2py process dies

And it seems better to just implement this as a web2py feature, than to stranglehold the scheduler into a different design.

Cron's @reboot is very close to this. I used to use it. The problems:
  • I still had to implement my own locks and kills. (what I was trying to avoid)
  • It spawns 2 python subprocesses for each cron task (ugly, but not horrible)
  • It was really buggy. @reboot didn't work. I think massimo fixed this.
  • Syntax is gross.
I basically just got scared of cron.
Now I guess I'm scared of everything. :/

Hopefully this detailed report of my experience will be of help to somebody. I'm sure that fixing the bugs will make things 5x better. I will try your new scheduler.py Niphlod!

Michael Toomim

unread,
Jun 26, 2012, 11:01:38 PM6/26/12
to web...@googlegroups.com
In case it is useful to someone, here is the full code I used with locking, using postgresql advisory locks. The benefit of using postgresql's locks are that:
  • It locks on the database—works across multiple clients
  • The locks are automatically released if a client disconnects from the db
  • I think it's fast

def check_daemon(task_name, period=None):
    period = period or 4

    tasks_query = ((db.scheduler_task.function_name == task_name)
                   & db.scheduler_task.status.belongs(('QUEUED',
                                                       'ASSIGNED',
                                                       'RUNNING',
                                                       'ACTIVE')))

    # Launch a launch_queue task if there isn't one already
    tasks = db(tasks_query).select()
    if len(tasks) > 1:          #  Check for error
        raise Exception('Too many open %s tasks!!!  Noooo, there are %s'
                        % (task_name, len(tasks)))
    if len(tasks) < 1:
        if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
            debug('Tasks table is already locked.')
            return

        # Check again now that we're locked
        if db(tasks_query).count() >= 1:
            debug('Caught a race condition! Glad we got outa there!')
            db.executesql('select pg_advisory_unlock(1);')
            return

        debug('Adding a %s task!', task_name)
        db.scheduler_task.insert(function_name=task_name,
                                     application_name='utility/utiliscope',
                                     task_name=task_name,
                                     stop_time = now + timedelta(days=90000),
                                     repeats=0, period=period)
        db.commit()
        db.executesql('select pg_advisory_unlock(1);')

    elif tasks[0].period != period:
        debug('Updating period for task %s', task_name)
        tasks[0].update_record(period=period)
        db.commit()

check_daemon('process_launch_queue_task')
check_daemon('refresh_hit_status')
check_daemon('process_bonus_queue')

Niphlod

unread,
Jun 27, 2012, 4:01:05 AM6/27/12
to
I don't know if continuing to give you fixes and alternative implementations is to be considered as harassment at this point, stop me if you're not interested into those.

There is a very biiig problem in your statements: if your vision is

Woohoo, this scheduler will automatically handle locks—so I don't need to worry about stray background processes running in parallel automatically, and it will automatically start/stop the processes with the web2py server with -K, which makes it much easier to deploy the code!

then the scheduler is the right tool for you. it's your app that doesn't handle locks, because of your initialization code put into models.

At least 2 of your problems (initialization and 40,000 scheduler_run records) could be fixed by a "recurring maintenance" task that will do check_daemon() without advisory locks and prune the scheduler_run table.

BTW: I'm pretty sure that when you say "scheduler should be terminated alongside web2py" you're not perfectly grasping how webdevelopment in production works. If you're using "standalone" versions, i.e. not mounted on a webserver, you can start your instances as web2py -a mypassword & web2py -K myapp and I'm pretty sure when hitting ctrl+c both will shutdown.

Michael Toomim

unread,
Jun 27, 2012, 2:52:33 PM6/27/12
to web...@googlegroups.com
I'm totally interested in solutions! It's a big problem I need to solve.

The recurring maintenance task does not fix the initialization problem—because now you need to initialize the recurring maintenance task. This results in the same race condition. It does fine with the 40,000 records problem. But it's just a lot of complexity we're introducing to solve a simple problem (looping tasks) with a complex solution (scheduler).

I'd still love to find a clean way to do this. Maybe we should extend the scheduler like this:
  • Add a daemon_tasks parameter when you call it from models "Scheduler(db, daemon_tasks=[func1, func2])"
  • When scheduler boots up, it handles locks and everything and ensures there are two tasks that just call these functions
  • Then it dispatches the workers processes as usual

...ah, shoot, looking in widget.py, it looks like the code that starts schedulers doesn't have access to the parameters passed to Scheduler() because models haven't been run yet. Hmph.

On Wednesday, June 27, 2012 12:56:52 AM UTC-7, Niphlod wrote:
I don't know if continuing to give you fixes and alternative implementations is to be considered as harassment at this point, stop me if you're not interested into those.

There is a very biiig problem in your statements: if your vision is

Woohoo, this scheduler will automatically handle locks—so I don't need to worry about stray background processes running in parallel automatically, and it will automatically start/stop the processes with the web2py server with -K, which makes it much easier to deploy the code!

Michael Toomim

unread,
Jun 27, 2012, 3:18:06 PM6/27/12
to web...@googlegroups.com
The problem with terminating the processes is:
  • sometimes they don't respond to control-c, and need a kill -9
  • or sometimes that doesn't work, maybe the os is messed up
  • or sometimes the developer might run two instances simultaneously, forgetting that one was already running

You're right that usually I can shut them both down with control-c, but I need a safeguard. My application spends money on mechanical turk and I'll spend erroneous money and upset my users if it goes wrong by accident.

Niphlod

unread,
Jun 27, 2012, 3:25:38 PM6/27/12
to web...@googlegroups.com
I don't find that this is a common scenario, anyway my idea was put the insertion/update of the task at cron @reboot, wrap that code in a try:except pass block and let the maintenance task to do the honours of checking if other 3 functions are ok. maintenance will run let's say, with a period of 40 secs, so no problem there.

btw, scheduler may need to be run outside web2py -K, so there is no "without locks" solution; we'll be forced to add also to that call all scheduler_task parameters (except for repeats=0) to maximize flexibility.

maybe adding a unique column to scheduler tasks that gets evaluated by default to a uuid will solve the problem.......e.g.
db.scheduler_task.insert(function_name='abcd', repeats=0, ....., unique_column='abcd') would close the deal for your requirements.
you'd have to wrap that in a try:except call for obvious reason.

as of parameters passed to scheduler, if you have in models Scheduler(parameters) they're correctly evaluated.
For greater flexibility you'd have to hack something...... but it's not reeeeeally difficult.
for w2p_tvseries I needed to define a Scheduler() in models and have to run the same Scheduler with other parameters for a cron task "executed outside web2py" like a "normal cron" .
Check https://github.com/niphlod/w2p_tvseries/blob/master/private/w2p_tvseries.py#L40

Niphlod

unread,
Jun 27, 2012, 3:37:27 PM6/27/12
to web...@googlegroups.com
uhm..... why not having them started with systemd or upstart or supervisord ?

Scheduler is "by design" allowed to run with multiple instances (to process a longer queue you may want to start more of them), but if you're really loosing money why didn't you rely on that services to be sure that there's only one instance running?
There are a looooot of nice implementations out there and the one I mentioned are pretty much "state-of-the-art" :D (while contributing to fix current issues)

BTW: - "responding to ctrl+c" fixed in trunk recently
           - "os messed up maybe" require you to check the os, python programs can't be omniscient :D
           - "messy developers", no easy fix for that too

ptressel

unread,
Jun 27, 2012, 8:02:26 PM6/27/12
to web...@googlegroups.com
Michael --


Here's a common scenario. I'm looking for the best implementation using the scheduler.

I want to support a set of background tasks (task1, task2...), where each task:
  • processes a queue of items
  • waits a few seconds

It's safe to have task1 and task2 running in parallel, but I cannot have two task1s running in parallel. They will duplicately process the same queue of items
....

So how can I ensure there is always EXACTLY ONE of each task in the database?

This won't solve your installation / setup issue, but I wonder if it would help with the overrun and timeout problems...  Instead of scheduling a periodic task, what about having the task reschedule itself?  When it's done with the queue, schedule itself for later.  Remove the time limit so it can take whatever time it needs to finish the queue.  Or maybe launch a process on startup outside of the scheduler -- when it exhausts the queue, have it sleep and either wake periodically to check the queue, or have it waked when something is inserted.

Is the transaction processing issue you encountered with PostgreSQL preventing you from setting up your queue as a real producer consumer queue, where you could have multiple workers?

Re. inserting tasks only once:  We have a "first run" check in our models to assure that setup code only runs once -- this only runs if the database is empty -- but that's not adequate if you update code on a running system and add a new task.  We added an "update check" using a version number -- we write a breadcrumb file into the models directory with the current version, and then check that against a version in the code that is changed by the developers when some update code needs to run or the site needs to take some action -- you might do something like that to insert new tasks just once.  (Details:  The breadcrumb file is named so it's run first before other models, and contains one statement that sets a global with the version number found during the previous models run.  The first "real" model file compares that last version against the current version.  If the breadcrumb file didn't exist or the version is different, it runs some update code and rewrites the breadcrumb file.  IIRC we open the breadcrumb file for exclusive access and spin if it's locked -- will need to make sure I did that...)

I don't think this would help with your case, but will mention...  I'm working on chaining scheduler tasks -- letting one task conditionally release held tasks or insert new ones.  Our need was different from yours -- we didn't know which task(s) we wanted to run until we read remote data (via a task for that purpose).  So our reader task fetches the data, figures out what needs to run and puts work in queues, releases previously scheduled tasks.  Since this mod made changes like having unique names for all tasks independent of the task function, there may be some issues with having a task reschedule itself using an unmodified scheduler that I'm not thinking of.

As an aside, there's always a problem with processing items in a queue (at least if the items are consumable rather than a persistent to-do list) namely, how do you assure that each item is completely processed, and the work within one item gets done only once, if the worker processing them might fail in the middle of processing an item?  If the worker takes the item out of the queue before starting work, then the item is lost if the worker dies.  If it leaves the item in the queue but marks it as being worked on by itself, another worker can redo it, but encounters the issue of picking up where the previous one left off.  For a database, that might be solved with transactions and rollback (assuming that's working...).  This isn't a problem with the scheduler per se -- it's a generic queue processing issue.

I'm probably missing some aspect of your situation, so let me say sorry! in advance if this isn't relevant.

-- Pat

Michael Toomim

unread,
Jun 28, 2012, 12:37:21 AM6/28/12
to web...@googlegroups.com
:)

Because I'm a perfectionist, and I want other developers to be able to install my system by just unzipping the code, running ./serve, and have it just work. So I want to use the built-in webserver and scheduler. There's no reason they shouldn't be able to manage these race conditions correctly.

I'm super excited that the Ctrl-C bug was fixed!

Your idea of putting the initializer in the @cron reboot is very appealing! I will think about this and see if I can come up with a nice solution with it. Ideally I could re-use this "daemon_task" setup for other projects as well, as I find it to be a quite common scenario. I understand you do not find it to be common. I am not sure why we have different experiences.

Would portalocker be a good thing to use for this situation? I would like to be cross-platform instead of relying on postgres locks.

Michael Toomim

unread,
Jun 29, 2012, 12:47:15 AM6/29/12
to web...@googlegroups.com
On Wednesday, June 27, 2012 5:02:26 PM UTC-7, ptressel wrote:
This won't solve your installation / setup issue, but I wonder if it would help with the overrun and timeout problems...  Instead of scheduling a periodic task, what about having the task reschedule itself?  When it's done with the queue, schedule itself for later.  Remove the time limit so it can take whatever time it needs to finish the queue.  Or maybe launch a process on startup outside of the scheduler -- when it exhausts the queue, have it sleep and either wake periodically to check the queue, or have it waked when something is inserted.

I don't see why we'd do this instead of just setting stop_time=infinity and repeats=0.
 
Is the transaction processing issue you encountered with PostgreSQL preventing you from setting up your queue as a real producer consumer queue, where you could have multiple workers?

No, I only want one worker. The scheduler itself works great as a producer/consumer queue.

I may have mislead you with the title of this thread—I'm trying to set up a general repeating background process, not a task queue in particular. Processing a task queue was just one use of the repeating background process function.

nick name

unread,
Jul 5, 2012, 5:49:36 PM7/5/12
to web...@googlegroups.com
This might have been solved in this week, but in case it wasn't:

You're tackling a general database problem, not a specific task queue or web2py problem. So you need to solve it with the database: set up another table to refer to you the task table, such as:

db.define_table('tasks_that_were_set_up',
   Field('name', 'string', unique=true),
   Field('scheduler', db.scheduler_task, notnull=True, required=True, unique=True, ondelete='CASCADE'),
)

Make your insert code insert a task to this table as well:

try:
 rid = db.scheduler_task.insert(name='task7',....)
 db.tasks_that_were_set_up.insert(name='task7', scheduler=rid)
 print 'Task was just added to db'
except db.integrity_error_class():
 print 'Tasks were already in db.... */

Now, if the task gets removed from the scheduler_task table, because of the cascading on_delete (which is the default - I just put it there for emphasis), the record in tasks_that_were_set_up will be removed as well.
And otherwise, because of the unique constraint, the insert to tasks_that_were_set_up can only succeed once -- and thanks to the transaction nature -- therefore so does scheduler_task.insert.

Michael Toomim

unread,
Jul 6, 2012, 5:48:25 PM7/6/12
to web...@googlegroups.com
This is a nice solution, and clever, thanks!

The upside (compared to postgres locks, as discussed above) is this works for any database. The downside is it creates a whole new table.
Reply all
Reply to author
Forward
0 new messages