When exactly are dag definitions parsed and how to dynamically change dag structure?

1,077 views
Skip to first unread message

Sergei Iakhnin

unread,
Jan 30, 2016, 5:21:03 AM1/30/16
to Airflow
I have a DAG that works on a large input file by breaking it up into pieces and running one task per piece. So far I have hard-coded the number of pieces to break down the file into and simply create the tasks in a for loop with a predetermined number of iterations, so the structure of the DAG is static. But suppose that I wanted to control the granularity on a per-run basis, i.e. the number of pieces is only known at the time that a dag-run in scheduled. Is this doable? What would be the recommended way of getting this done? 

Is it neccessary/possible to have a task that determines split size based on a context parameter and then programmatically creates the necessary number of downstream tasks? Or can this be accomplished directly at the dag definition level? It appears that dag definitions are parsed at runtime as I've made changes to dags after triggering them and seen the changes get picked up. But is this behaviour well defined?

Thanks in advance,

Sergei.

Maxime Beauchemin

unread,
Jan 30, 2016, 11:35:04 PM1/30/16
to Airflow
If the DAG changes shape at every execution (or close to that), it's easier to represent that in Airflow as different dag_id, and run each DAG only once (a singleton DAG.).

You can have a pipeline that creates many DAGs dynamically with a `schedule_interval='@once'`. The key is in using something like the line bellow to stamp the dag as a global variable dynamically:
```
for i in range(5):
    globals()['dag_' + str(i)] = DAG('dag_id_' + str(i))
```

Max

Sergei Iakhnin

unread,
Jan 31, 2016, 3:00:20 PM1/31/16
to Airflow
The DAG doesn't necessarily change shape that much, probably 95% of the time it will be the same shape, i.e. processing a large file that is chunked into a set number of pieces, but I'd like to be able to once in a while process only some of the chunks by setting a parameter. Right now my set up is that I have a separate table that stores dag run configurations in a db table in json format. My runs are never scheduled but always triggered via the TriggerDagRunOperator. So if I want to dynamically change the shape of the DAG based on a parameter I guess I need to instantiate the DAG within my own module rather than just putting my dag definition in the DAGs folder as I've been doing so far. But what is the proper way to register this DAG with Airflow? Is just instantiating it enough to have it persist into the DB and register with all of the UIs, and get propagated to the workers?

Thanks for your response, btw.

Sergei.
Reply all
Reply to author
Forward
0 new messages