Dynamically create DAGs with multiple tasks

1,888 views
Skip to first unread message

evang...@chope.co

unread,
Sep 24, 2018, 12:43:15 PM9/24/18
to cloud-composer-discuss
Hi,

Is it possible to create DAGs with multiple tasks dynamically? i.e. many DAGs with different tasks generated from a single python file. 

For example:

for i in range(5):
  DAG_NAME
= "process_type_{}".format(i)

 
with DAG(DAG_NAME, default_args, schedule, catchup) as dag:
    start
= DummyOperator(
          task_id
="start",
          dag
=dag)

   
end = DummyOperator(
          task_id
="end",
          dag
=dag)

    start
>> end
    globals
()[DAG_NAME] = dag

Currently, the DAGs do get scheduled and can run. However the time taken for each task of the DAGs created in the loop seem to take much longer than they do, as compared to being scheduled separately in an individual file. Furthermore, if the number of loops increase, the time taken increases almost proportionately. I've tried various methods, including putting the tasks into a factory method, yet the problem still remains the same.

Is there a issue with how my DAG is defined?

Feng Lu

unread,
Oct 4, 2018, 12:50:42 AM10/4/18
to evang...@chope.co, cloud-composer-discuss
Have you considered profiling your DAG runtime performance? e.g., python -m cProfile -s cumtime airflow list_dags 
Other Airflow CLI commands will do (e.g., airflow run) as long the execution path includes parsing the DAG file. 


--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/32a28961-f89e-4f29-b98c-0e6746e42ea5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Wilson Lian

unread,
Oct 4, 2018, 12:48:50 PM10/4/18
to evang...@chope.co, cloud-composer-discuss
Every Airflow worker needs to parse the DAG file, which requires executing the top-level loop. As you've discovered, this means that every single task instance needs to undergo that startup overhead. There's no getting around that if you want to have a single python module that defines all your DAGs.

You can try to reduce the overhead of parsing the DAG file by sharding. If the number of DAGs is a fixed value, you could use a script to generate 1.py, 2.py, ..., n.py, and have each one import and call a DAG factory library. For example:

1.py:

from dag_builder import build_dag
dag = build_dag(1)


dag_builder.py:
def build_dag(i):
  DAG_NAME = "process_type_{}".format(i)

  with DAG(DAG_NAME, default_args, schedule, catchup) as dag:
    start = DummyOperator(
          task_id="start",
          dag=dag)

    end = DummyOperator(
          task_id="end",
          dag=dag)

    start >> end

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-composer-discuss@googlegroups.com.
Message has been deleted

evang...@chope.co

unread,
Oct 4, 2018, 11:11:37 PM10/4/18
to cloud-composer-discuss
Hi Wilson,

Thanks for the explanation, will try this method out instead.
Reply all
Reply to author
Forward
0 new messages