improvements to scheduler

293 views
Skip to first unread message

Niphlod

unread,
Jun 16, 2014, 6:21:22 PM6/16/14
to web2py-d...@googlegroups.com
Hi @all,
    in the last week I've been busy coding out two major features for the scheduler:
A) workers coordination with redis
B) task dependencies

A little bit of introduction....
A) workers coordination with redis (and assignment)
given that the only method to pass messages around workers to coordinate is the database, as soon as you add ~20 workers the db pressure is pretty high...unfortunately I can't "simplify" the logic enough to require "less" queries to the backend...
Given the statement that "web2py's scheduler is good enough to process tasks, when task dispatching times are irrelevant when measured up to the actual task execution time" (that translates roughly to "if you need sub-second dispatching use something else" OR "it was not done to process 1k tasks every second") I thought about moving to redis just the workers coordination process. This means that the scheduler_worker table gets replaced by some keys in redis. Recently redis has been "officially supported" by Microsoft (big statements were done regarding redis availability in Win Azure VMs through msopentech builds) and I didn't had problems with it running in Windows Server (> 2008R2)...so it's a viable solution for all platforms.
A nice addition is that with redis-backed scheduler the background thread checks every 5 * heartbeat seconds for new tasks, but the actual worker process "holds" in a blocking fashion for assigned tasks coming into the queue... this translates to sub-second dispatching for queue_task(...,immediate=True) indeed. Assignment also is easier (and lighter), because instead of shuffling QUEUED and ASSIGNED to worker_names, the task id gets inserted into a queue (named as the relevant group_name) that every worker can pop atomically from (again, roughly, there's no need for ASSIGNED).
The only limitation is that to achieve sub-second dispatching there must be a worker listening to the group_name...multiple group names are allowed but then the "listening" process must cycle between group_names serially, i.e. wait for a task assigned to group1 30 seconds, then wait for a task assigned to group2 30 seconds, and so on...Given that with redis the number of workers can be also 50 or more, doesn't seem like a biiiiig limit.
Given that workers coordination and tasks assignment are - hopefully - something that users don't fiddle with, the redis-backed scheduler can be assumed as a slip-on with no change in features whatsoever. I thought about moving the scheduler_run table to redis too (it's not that difficult) but then thought that analysis on the duration on various tasks during the day, etc (i.e. using the scheduler_run table as a "log" of what happened) was too difficult to achieve querying just redis. We can talk about that further in the future (maybe using redis for fast "give me the result of this task" and then persisting to the scheduler_run table the records... ^_^)
All of this was implemented subclassing Scheduler and overriding what was needed.

B) task dependencies
this has been a good thing to work on...but needed an extra table. Basically the goal is to declare dependencies among tasks in a DAG (directed acyclic graph) fashion. I initially steered towards a meta-task (as the assignment is) that enabled-disabled the task on-demand with a DAG topological-sort algorithm. Then I figured out a carefully tailored query that discards tasks with not-resolved dependencies and all previous limitations went off. Users can now do whatever they like with their tasks, and connect them with as many edges they want.
Of course the only "limit to immagination" is that you can't schedule recurring tasks expecting them to work with dependencies that self-heal themselves as soon as a complete job is finished (TBH it could be done but seems highly counter-intuitive).
Basically a "dependency" is resolved as soon as the task is marked as COMPLETED.
A "job" (a set of tasks that can be represented as a DAG) can be "validated" running a topological-sort (just to be sure beforehand that the scheduler will actually finish the damn thing ^_^).
The new table is defined as follows
db.define_table(
    'scheduler_task_deps',
    Field('job_name', default='job_0'),
    Field('task_parent', 'reference scheduler_task'),
    Field('task_child', 'reference scheduler_task'),
    Field('is_pending', 'boolean', default=False)
)
Here, job_name is just for validation and representation's sake. Column names are pretty self-explanatory, but just to be precise, a task 1 that needs to be executed before task 2  translates to "task 1 depends on task 2" that translates to a row where task_parent == 1 and task_child == 2.
Every row in this table represents an edge in the DAG. is_pending == True means that the scheduler can "walk that path".
To validate a job, users need to just queue all tasks (with enabled=False to be on the safe-side, but again, not required), add edges, pass the job_name to the the validation algo. If the job is a DAG, then it can be finished ("safe" users will at this point turn enabled to True for the relevant tasks). To enable all of this, only ONE additional query once the task is COMPLETED is needed. I don't think I can accomodate for the new feature with less than that :-P

I need to polish a few corners and test it better, but here's a list of questions to @all developers....
1) I don't want to release this to the wild (surely not as a stable-something) but at the same time I need users to test it. How should we go about that ?
2) Should we include redis-backed scheduler into gluon.scheduler or in a separate module ?
3) the comfy scheduler-monitor plugin has been "accomodated" to work with redis-backed scheduler too. I'll probably add a new branch and then merge it to the master as soon as the new scheduler lands on web2py's master...everybody agree that it's the best method ?
4) Should we provide some facility-class to add edges or rely just on db.scheduler_task.deps.insert(....) ? The "crafted query" to assign tasks doesn't need strong validation (i.e. there can be two lines describing the same directed edge without the scheduler even noticing...). At the same time what I can think of is just a
g = JobGraph(job_name)
g.add_egde(from_id, to_id)
g.add_egde(from_id2, to_id2) #validates that the edge has not been drawn yet...
......
g.validate() #returns True if the job is a DAG, else False and prunes all the records.
but it doesn't seem to bring that much "shortcuts" . Any idea is welcome

Niphlod

unread,
Jul 30, 2014, 4:58:15 PM7/30/14
to web2py-d...@googlegroups.com
noone ... ^_^'

Okay, I'm going to push at most in the middle of the next week B).
I'm updating w2p_scheduler_tests accordingly to promote the new feature and add a few tests (testing automatically multiprocessing is still escaping my poor brain).
Right now, I'm quite satisfied. Without dropping performance points, the scheduler can process automatically tasks with dependencies.
There's a new helper class "JobGraph" that, e.g., turns this


into this "api"

from gluon.scheduler import JobGraph
watch
= s.queue_task(fname, task_name='watch')
jacket
= s.queue_task(fname, task_name='jacket')
shirt
= s.queue_task(fname, task_name='shirt')
tie
= s.queue_task(fname, task_name='tie')
pants
= s.queue_task(fname, task_name='pants')
undershorts
= s.queue_task(fname, task_name='undershorts')
belt
= s.queue_task(fname, task_name='belt')
shoes
= s.queue_task(fname, task_name='shoes')
socks
= s.queue_task(fname, task_name='socks')

myjob
= JobGraph(db, 'job_1')
# before the tie, comes the shirt
myjob
.add_deps(tie.id, shirt.id)
# before the belt too comes the shirt
myjob
.add_deps(belt.id, shirt.id)
# before the jacket, comes the tie
myjob
.add_deps(jacket.id, tie.id)
# before the belt, come the pants
myjob
.add_deps(belt.id, pants.id)
# before the shoes, comes the pants
myjob
.add_deps(shoes.id, pants.id)
# before the pants, comes the undershorts
myjob
.add_deps(pants.id, undershorts.id)
# before the shoes, comes the undershorts
myjob
.add_deps(shoes.id, undershorts.id)
# before the jacket, comes the belt
myjob
.add_deps(jacket.id, belt.id)
# before the shoes, comes the socks
myjob
.add_deps(shoes.id, socks.id)
myjob
.validate('job_1') #raises an exception if the job is not a DAG


To be fair, scheduler code is transparent (i.e. it process exactly what it can, rolling down the dependency graph as long as it can, without paying attention to job_name), but JobGraph is a nice helper to see immediately if the job (as a set of tasks with dependencies) is accomplishable or not in the first place. I'm using it in production and I didn't spot any issues but hey, what'd you expect ? I'm the creator :°°°°°D

w2p_scheduler_monitor will eventually be updated later (probably late August at most) to deal with graphs (I have a POC that works but is hardly stable ^_^).

I'm pushing a taddle bit forward A) (also if in production it seems pretty rock-solid) just because I'd need to update accordingly w2p_scheduler_monitor.

Niphlod

unread,
Aug 4, 2014, 3:51:03 PM8/4/14
to web2py-d...@googlegroups.com
PR is ready (https://github.com/web2py/web2py/pull/477) for B)

I updated accordingly also https://github.com/niphlod/w2p_scheduler_tests (as always, to clarify new experimental features before they get into "stable" status).

As soon as the PR is merged I'll make an announcement in web2py-users.

Massimo DiPierro

unread,
Aug 4, 2014, 4:08:40 PM8/4/14
to web2py-d...@googlegroups.com
Thanks. I will merge later today. :-)

--
-- mail from:GoogleGroups "web2py-developers" mailing list
make speech: web2py-d...@googlegroups.com
unsubscribe: web2py-develop...@googlegroups.com
details : http://groups.google.com/group/web2py-developers
the project: http://code.google.com/p/web2py/
official : http://www.web2py.com/
---
You received this message because you are subscribed to the Google Groups "web2py-developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to web2py-develop...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Leonel Câmara

unread,
Aug 4, 2014, 6:39:05 PM8/4/14
to web2py-d...@googlegroups.com
In your get_workers, why do you have a dict inside the Storage declaration?

        for row in workers:
            all_workers[row.worker_name] = Storage(dict(
                status=row.status,
                first_heartbeat=row.first_heartbeat,
                last_heartbeat=row.last_heartbeat,
                group_names=row.group_names,
                is_ticker=row.is_ticker,
                worker_stats=row.worker_stats
                )
            )

I would probably change this to:

        for row in workers:
            all_workers[row.pop('worker_name')] = row

niphlod

unread,
Aug 4, 2014, 6:51:46 PM8/4/14
to web2py-d...@googlegroups.com

The implementation is clearer in case of overrides

--
-- mail from:GoogleGroups "web2py-developers" mailing list
make speech: web2py-d...@googlegroups.com
unsubscribe: web2py-develop...@googlegroups.com
details : http://groups.google.com/group/web2py-developers
the project: http://code.google.com/p/web2py/
official : http://www.web2py.com/
---
You received this message because you are subscribed to a topic in the Google Groups "web2py-developers" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/web2py-developers/Av60uVsMkMA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to web2py-develop...@googlegroups.com.

Leonel Câmara

unread,
Aug 5, 2014, 8:20:49 AM8/5/14
to web2py-d...@googlegroups.com
What do you mean? In case pop gets overriden?

Niphlod

unread,
Aug 5, 2014, 8:29:37 AM8/5/14
to web2py-d...@googlegroups.com
nope, in case get_workers() does.
Also, I don't want to bring around silly recordeleter and recordupdater methods, etc etc etc.
In one word, I don't want Row(s) to be there. Just values.

Leonel Câmara

unread,
Aug 5, 2014, 9:03:00 AM8/5/14
to web2py-d...@googlegroups.com
Fair enough, however, why:

            all_workers[row.worker_name] = Storage(dict(
                status=row.status,
                first_heartbeat=row.first_heartbeat,
                last_heartbeat=row.last_heartbeat,
                group_names=row.group_names,
                is_ticker=row.is_ticker,
                worker_stats=row.worker_stats
                )
            )

Instead of just:

            all_workers[row.worker_name] = Storage(

Niphlod

unread,
Aug 5, 2014, 10:12:43 AM8/5/14
to web2py-d...@googlegroups.com
it's a typo, guess it's habit ^_^' . 
feel free to send a patch, I'm away from the pc 

Leonel Câmara

unread,
Aug 5, 2014, 10:41:07 AM8/5/14
to web2py-d...@googlegroups.com
Done. I have added it to my current pull request. BTW I hope I'm not being too nitpicky.

Niphlod

unread,
Aug 5, 2014, 11:07:38 AM8/5/14
to web2py-d...@googlegroups.com
no problem at all. It's good to have an additional set of eyes reviewing code.

Niphlod

unread,
Oct 23, 2014, 5:37:32 PM10/23/14
to web2py-d...@googlegroups.com
always late........but hey, better late than never :-P
After a few weeks in production, (and noticing a general increase in responsiveness and a general decrease of cpu and disk usage) we might be days away from a redis-backed scheduler.
Coughing up a little bit of stats clearly indicates that the main goal of the new scheduler (reduce db pressure) has been reached....

A few numbers (with the help of postgresql statistics).....
5 workers, processing 3000 "add(a,b): return a+b" tasks, quick burst of ~3 minutes
type       #commits   #tuples returned      fetched     inserted     updated     deleted
legacy        
15478            9800450        53920         3019       12709           6
redis        
15108             195444        51954         3012       12128          12
diff            
370            9605006         1966            7         581           6



5 workers, lying around with no tasks, 60 seconds

type       #commits   #tuples returned      fetched     inserted     updated     deleted
legacy          
241             354664         4150            5         111           5
redis            
52              19272          578            0           0           0
diff            
189             335392         3572            5         111           5



The difference in raw speed (i.e. what it takes to process the whole set of 3000 simple tasks) is barely noticeable with this number of workers (legacy 195 seconds, redis 175), but, e.g., with 30 of them....legacy finishes in 127 seconds, while redis takes 50.
Well, on a commodity hardware (4gb ram, 5400rpm disk, 1.2Ghz intel dual core), reaching 15 tasks per second with legacy and 60 per second with redis sounds **almost** too good to be true....and all of that clocks around 600 LOC for legacy and 400 LOC for redis.

Of course there's a limit between what the current implementation can handle in terms of concurrency and raw speed, and I don't imagine having a fleet of 150 workers ready to process tasks with web2py's scheduler (for a needed concurrency that high, far better - and complicated - implementations are out there), but at least we can push a taddle bit more limits without changing any app code, and scale without hurting the database.

What do you think ?

Massimo DiPierro

unread,
Oct 23, 2014, 5:44:18 PM10/23/14
to web2py-d...@googlegroups.com
+1

--
-- mail from:GoogleGroups "web2py-developers" mailing list
make speech: web2py-d...@googlegroups.com
unsubscribe: web2py-develop...@googlegroups.com
details : http://groups.google.com/group/web2py-developers
the project: http://code.google.com/p/web2py/
official : http://www.web2py.com/
---
You received this message because you are subscribed to the Google Groups "web2py-developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to web2py-develop...@googlegroups.com.

Niphlod

unread,
Dec 2, 2014, 5:07:40 PM12/2/14
to web2py-d...@googlegroups.com
upon further headaches I can't seem to be able to decide myself on what

sched.queue_task(...., immediate=True)

really should do.
In the current implementation, where things are relatively slow, it just "wakes up" a worker and issues a loop of assignments. With redis though, the assignment round can be circumvented "smartly" just pushing the relative task on the queue where workers are listening.

The culprit of it all is summed up to considering that I'd like to achieve sub-second assignment (and execution). 
Making immediate=True behave like current implementation can only be as good as waiting ``heartbeat`` seconds before the task is processed. To make the wait even shorter, ATM there's a facility that lets pushing the id of the task into a (carefully) crafted redis key... and available workers will pick that up right away in milliseconds...

But this poses a few issues (posed by the original implementation), namely:
- sched.queue_task(start_time=date_far_in_the_future, immediate=True) won't respect start_time. This is valid for a high number of variables, not only start_time (dependencies, times_run, stop_time, enabled, etc)
- the task must be in the database already... pushing an id of a task that is not committed yet - of course - results in no valid task picked by the worker (yep, redis is THAT fast)

Assuming for a second that "we're all consenting adults" and that with redis scheduler:
- you NEED to pass a ready-to-run task if you use immediate=True (translation, it will be processed no matter what start_time, etcetcetc parameters are)
and that it may be taken care of with proper documentation and big fat warnings, the second point is holding me a grudge.

Let's assume for a second that the queue_task(..., immediate=True) really would do a commit before pushing the key to redis..... in that case transactions are not observed: given that there's no way to use savepoints on DAL...something like

def a_controller_queuing_stuff():
   
.....
    db
.thetable.insert(whatever='abc')  
   
......
    sched
.queue_task(process_this, ......)
   
......
    db
.othertable.insert(whatever2='abc') ## exception because whatever2 is an integer field
   
.......

is "safe" because the task isn't queued ("automatic" rollback())

if redis scheduler instead enforces a commit() "inside" queue_task(immediate=True) to circumvent the aforementioned issue

def a_controller_queuing_stuff():
   
.....
    db
.thetable.insert(whatever='abc')
   
......
    sched
.queue_task(process_this, ...., immediate=True)   ## 'abc' makes into thetable, task is queued and processed
   
......
    db
.othertable.insert(whatever2='abc') ## exception because whatever2 is an integer field

is not safe anymore. To be more precise, that behaviour would enforce a "not-atomical" behaviour of the controller flow.

To me, ATM it really seems like a snake biting its own tail... does anyone have an idea on how to solve the puzzle ?

Leonel Câmara

unread,
Dec 3, 2014, 7:05:38 AM12/3/14
to web2py-d...@googlegroups.com
The only alternative I can see is to make the scheduler add the tasks after the controller has finished. Maybe immediate=True could make the scheduler add the task by appending a function to do it to current.response.postprocessing instead of really adding it immediately.

Niphlod

unread,
Dec 4, 2014, 2:50:41 PM12/4/14
to web2py-d...@googlegroups.com
uhm. didn't even know about response.postprocessing ..... let's try it and see where it leads :P

Niphlod

unread,
Dec 10, 2014, 3:47:24 PM12/10/14
to web2py-d...@googlegroups.com
nope, response.postprocessing takes place before the actual commit.
It got me in the right direction (or so I thought)... response.custom_commit. But damn! it's not documented (thing that usually worries me sick) and it's not a list, so it would override what skilled users have already there............

Jonathan Lundell

unread,
Dec 10, 2014, 4:00:32 PM12/10/14
to web2py-d...@googlegroups.com
On 10 Dec 2014, at 12:47 PM, Niphlod <nip...@gmail.com> wrote:

nope, response.postprocessing takes place before the actual commit.
It got me in the right direction (or so I thought)... response.custom_commit. But damn! it's not documented (thing that usually worries me sick) and it's not a list, so it would override what skilled users have already there………… 

You could save & call the previous contents, I suppose.

Massimo DiPierro

unread,
Dec 10, 2014, 11:53:34 PM12/10/14
to web2py-d...@googlegroups.com
The problem is that I am not sure anything done in custom_commit or the deprecated postprocessing occurs as you say before commit and therefore also before the data is returned to the user. It had good intentions but it does not work as one would want. That is why we have the scheduler. They are not documented because I could not in good conscience find a good use for them.

--
-- mail from:GoogleGroups "web2py-developers" mailing list
make speech: web2p

Niphlod

unread,
Dec 11, 2014, 5:00:04 AM12/11/14
to web2py-d...@googlegroups.com
to me sounds like things we should get rid off. response.postprocessing can still live as an hook to different templating systems and it's quite straightforward... custom_commit (or _custom_commit) should be deleted, the logic is too convoluted, and the same goes for _custom_callback (btw, very unconsistent naming scheme between all these things...).
Anyway, a callback after the commit is surely something that this specific usecase could benefit.

Anthony

unread,
Dec 11, 2014, 8:35:35 AM12/11/14
to web2py-d...@googlegroups.com
Is there no good use for response.postprocessing? Other frameworks seem to have similar functionality (e.g., Flask's app.after_request and Django's middleware).


On Wednesday, December 10, 2014 11:53:34 PM UTC-5, Massimo Di Pierro wrote:
The problem is that I am not sure anything done in custom_commit or the deprecated postprocessing occurs as you say before commit and therefore also before the data is returned to the user. It had good intentions but it does not work as one would want. That is why we have the scheduler. They are not documented because I could not in good conscience find a good use for them.
On Dec 4, 2014, at 1:50 PM, Niphlod <nip...@gmail.com> wrote:

uhm. didn't even know about response.postprocessing ..... let's try it and see where it leads :P

On Wednesday, December 3, 2014 1:05:38 PM UTC+1, Leonel Câmara wrote:
The only alternative I can see is to make the scheduler add the tasks after the controller has finished. Maybe immediate=True could make the scheduler add the task by appending a function to do it to current.response.postprocessing instead of really adding it immediately.

--
-- mail from:GoogleGroups "web2py-developers" mailing list
make speech: web2p

details : http://groups.google.com/group/web2py-developers
the project: http://code.google.com/p/web2py/
official : http://www.web2py.com/
---
You received this message because you are subscribed to the Google Groups "web2py-developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to web2py-developers+unsubscribe@googlegroups.com.

Leonel Câmara

unread,
Dec 11, 2014, 1:20:08 PM12/11/14
to web2py-d...@googlegroups.com
I think it's useful, I use it for things that I want to run after all my controllers code. It avoids code duplication in the controllers. You can easily put anything you want to run before all controllers in the models but the only place to put code that runs after the controller is in postprocessing.

The reason you may want to put code after the controller is that you may want to change the response depending on the variables the controller has set. For instance, you may want to check for a header in the response and do the same thing for all controllers that have set that header.

Niphlod

unread,
Dec 11, 2014, 2:49:07 PM12/11/14
to web2py-d...@googlegroups.com
rephrasing... response.postprocessing is documented and quite useful if you want to run code in-between controller and view execution. It is good to hook up templating systems, running tracing, caching, etc. it's _custom_commit, custom_commit and _custom_rollback that are too convoluted, not documented, and quite frankly a taddle bit "sort-of-hacks". 
We have some of those pieces running in core, slipped in probably by someone who could make a use for it but didn't think entirely straight when implementing (and performance-wise, some are bummer, e.g., copystream_progress).
The thing is, "automatic commit and rollback" are meant to ensure that if any piece of code (models, controllers, views) raises an exception, it's catched and rolled back. That's why response.postprocessing runs before commit.... But we don't have anything to act AFTER the commit or the rollback take place.
Reply all
Reply to author
Forward
0 new messages