db.scheduler_task.insert(function_name='task1', task_name='task1',
stop_time = now + timedelta(days=90000), repeats=0, period=10)def initialize_task_queue(task_name): num_tasks = db((db.scheduler_task.function_name == task_name)
& ((db.scheduler_task.status == 'QUEUED') | (db.scheduler_task.status == 'ASSIGNED') | (db.scheduler_task.status == 'RUNNING') | (db.scheduler_task.status == 'ACTIVE'))).count()
# Add a task if there isn't one already if num_tasks < 1: db.scheduler_task.insert(function_name=task_name, task_name=task_name, stop_time = now + timedelta(days=90000), repeats=0, period=period) db.commit()
initialize_task_queue('task1')
initialize_task_queue('task2')
initialize_task_queue('task3')
if not db.executesql('select pg_try_advisory_lock(1);')[0][0]: return
... count tasks, add one if needed ...
db.executesql('select pg_advisory_unlock(1);')I am yet to do this with web2py scheduler, but why not have something in cron at reboot to insert and perhaps a regular task to check, if you like. We implement mutex functions for this and they can be as fancy as you like. http://en.wikipedia.org/wiki/Mutual_exclusion
Either enforce uniqueness at the database, or have something in your mutex function that will only allow one process at a time to attempt an insert. A lock token file perhaps.
Web2py should enforce uniqueness, but you can also implement you own mutex function.
As for the bugs, these aren't design flaws, they just need to get fixed.
I have not given up on the scheduler.
"Woohoo, this scheduler will automatically handle locks—so I don't need to worry about stray background processes running in parallel automatically, and it will automatically start/stop the processes with the web2py server with -K, which makes it much easier to deploy the code!"
def check_daemon(task_name, period=None): period = period or 4
tasks_query = ((db.scheduler_task.function_name == task_name) & db.scheduler_task.status.belongs(('QUEUED', 'ASSIGNED', 'RUNNING', 'ACTIVE')))
# Launch a launch_queue task if there isn't one already tasks = db(tasks_query).select() if len(tasks) > 1: # Check for error raise Exception('Too many open %s tasks!!! Noooo, there are %s' % (task_name, len(tasks))) if len(tasks) < 1: if not db.executesql('select pg_try_advisory_lock(1);')[0][0]:
debug('Tasks table is already locked.') return
# Check again now that we're locked if db(tasks_query).count() >= 1: debug('Caught a race condition! Glad we got outa there!') db.executesql('select pg_advisory_unlock(1);') return
debug('Adding a %s task!', task_name) db.scheduler_task.insert(function_name=task_name, application_name='utility/utiliscope', task_name=task_name, stop_time = now + timedelta(days=90000), repeats=0, period=period) db.commit() db.executesql('select pg_advisory_unlock(1);')
elif tasks[0].period != period: debug('Updating period for task %s', task_name) tasks[0].update_record(period=period) db.commit()
check_daemon('process_launch_queue_task')check_daemon('refresh_hit_status')check_daemon('process_bonus_queue')I don't know if continuing to give you fixes and alternative implementations is to be considered as harassment at this point, stop me if you're not interested into those.
There is a very biiig problem in your statements: if your vision is
Woohoo, this scheduler will automatically handle locks—so I don't need to worry about stray background processes running in parallel automatically, and it will automatically start/stop the processes with the web2py server with -K, which makes it much easier to deploy the code!
Here's a common scenario. I'm looking for the best implementation using the scheduler.I want to support a set of background tasks (task1, task2...), where each task:• processes a queue of items• waits a few secondsIt's safe to have task1 and task2 running in parallel, but I cannot have two task1s running in parallel. They will duplicately process the same queue of items
....
So how can I ensure there is always EXACTLY ONE of each task in the database?
This won't solve your installation / setup issue, but I wonder if it would help with the overrun and timeout problems... Instead of scheduling a periodic task, what about having the task reschedule itself? When it's done with the queue, schedule itself for later. Remove the time limit so it can take whatever time it needs to finish the queue. Or maybe launch a process on startup outside of the scheduler -- when it exhausts the queue, have it sleep and either wake periodically to check the queue, or have it waked when something is inserted.
Is the transaction processing issue you encountered with PostgreSQL preventing you from setting up your queue as a real producer consumer queue, where you could have multiple workers?