Scheduling priority

1,739 views
Skip to first unread message

Sergei Iakhnin

unread,
Dec 11, 2015, 10:55:49 AM12/11/15
to Airflow
I'm struggling a little to implement a priority scheduling the way that I desire and would welcome anyone's help.

My workflow is as in the attached image. I always trigger these manually in batches of several hundred runs. Because new tasks are only scheduled when their upstream tasks have succeeded my release_sample tasks are not in the queue until all of the regenotype tasks have completed. This means that I always have unnecessarily long running workflows because the queue is always full of regenotype tasks the release_sample tasks only get to run at the very end. This is not ideal, as I would prefer to finish each individual workflow run as soon as possible. 

My ideal scheduling scenario would be to have tasks increase in priority the farther along the DAG they are (I think this makes sense - you want to finish things that are close to finishing). But this seems to be impossible given the current priority_weight assignment that is additive of all the downstream priorities.

Is there a solution that will help me get the scheduling I am after?

Thanks in advance,

Sergei.
Screen Shot 2015-12-11 at 4.46.47 PM.png

Maxime Beauchemin

unread,
Dec 11, 2015, 11:15:19 AM12/11/15
to Airflow
The only official way to set priorities for tasks is based on the `priority_weight` param in BaseOperator.

The sum of the subtree of the task gets computed and stored in the `task_instance` table at queuing time. From that point, the priority_weight is what is used for prioritization, you can look at it in the "Task Instances" crud view.

If you want more dynamic prioritization rules, it might make sense to compute prioritize and alter the priority_weight of queued tasks in the `task_instance` table based on the dynamic rules you have. You'd have to right an external program, or perhaps a PythonOperator or MySqlOperator to do that.

Max

Sergei Iakhnin

unread,
Dec 11, 2015, 3:44:37 PM12/11/15
to Airflow
What do you think about having a Strategy there with multiple implementations like subtree, ancestors, or nominal priority weight?

Maxime Beauchemin

unread,
Dec 14, 2015, 12:35:57 PM12/14/15
to Airflow
It seems reasonable. I'd like to see use cases for it before injecting more complexity in that area.

Max

Sergei Iakhnin

unread,
Dec 14, 2015, 3:30:15 PM12/14/15
to Airflow
Hi Maxime,

The use-case is as described above. When there are many DAG runs at the same time, a downstream task will not get run until all upstream tasks in all runs have finished. 

In my instance, my workflow has three layers, the first layer has a task that locks an object, that object is then worked on by several tasks in parallel in the second layer, and the object is then unlocked in the third layer. Once the work is completed, I'd want the object to get unlocked as soon as possible, but instead the last task never gets scheduled until all objects (across all dag runs) have finished processing, then they all get unlocked. 

If you think about priority as a proxy for value then it makes sense to me that priority should increase the deeper you go into a DAG, because you would have done work towards a goal (by completing some tasks along a path), but not achieved the final goal (end of the run) thus you should want to achieve the end of the run that delivers that run's value before you schedule tasks from other runs that are not as far along. This makes each run as short as possible, which I think is desirable in a distributed system in terms of limiting the exposure of intermediate results to adverse external events, as well as minimizing resource locks.

This kind of execution would be achieved if instead of adding priority weights in a subtree you would add weights of ancestors from up the tree.

Maxime Beauchemin

unread,
Dec 15, 2015, 2:07:03 AM12/15/15
to Airflow
Gotcha. It should be easy to add a new param to BaseOperator `priority_sum_subtree=True`, and change the way this property works based on that here.

Can you can submit a PR with this feature? Otherwise open an issue on Github, though I'm not sure when I'll get to it.

Max
Reply all
Reply to author
Forward
0 new messages