Data/event driven DAG instantiation

807 views
Skip to first unread message

Austin Barton

unread,
Jan 20, 2016, 1:49:58 PM1/20/16
to Airflow
Inline with the discussion found within https://github.com/airbnb/airflow/issues/289, is there a reasonable way to instantiate the execution of a DAG based off an event or data?

Towards the end of the discussion I see the suggestion of reorienting a workflow to operate over a set of inputs rather than an individual ones. Pros and cons, I understand, but I feel it is the wrong solution for me. At this time if I don't want to reorient my workflow I see opportunity to use the CLI option trigger_dag with a run_id which I could reference from code to have input context. Again, pros and cons. I feel like I'm hacking the framing to fit my house.


Maxime Beauchemin

unread,
Jan 20, 2016, 6:11:08 PM1/20/16
to Airflow
One solution is to have another DAG that runs often (say every 5 minutes) and triggers you DAG if a criteria is met using the TriggerDagRunOperator

http://pythonhosted.org/airflow/code.html#airflow.operators.TriggerDagRunOperator

Austin Barton

unread,
Jan 21, 2016, 10:15:08 AM1/21/16
to Airflow
I do not see how this facilitates a one to many relationship unless each interval of the schedule is the iterator. Is that what you are proposing?

E.g. a work package is periodically dumped in a directory. For each work package spawn a DAG.

Maxime Beauchemin

unread,
Jan 21, 2016, 1:44:39 PM1/21/16
to Airflow
Yes, every five minutes you have a routine that looks for new files and triggers a DAG. It needs to memoize what it has triggered before to not double trigger the same file.

Isn't that what you want? What do you mean by one to many?

Austin Barton

unread,
Jan 21, 2016, 2:20:00 PM1/21/16
to Airflow
As an iterator I can see this working as a solution.

By one to many I elude to the "watching" DAG spawning one to many DAGs on each of its scheduled runs. Via your solution it is my interpretation that I would spawn a DAG for just the next item of interest. On each scheduled run of the "watching" DAG I would continue this until all items of interest have had DAGs spawned. Throughput on generating the new DAGs would be directly related to the frequency of the parent DAG. It'd work but suggests I might be trying to use the wrong solution for my problem.

Austin Barton

unread,
Jan 21, 2016, 3:23:47 PM1/21/16
to Airflow
It looks like I could provide my own Operator implementation based on TriggerDagRunOperator that facilitates spawning more than a single DAG.

Maxime Beauchemin

unread,
Jan 21, 2016, 4:19:39 PM1/21/16
to Airflow
Yeah there's really not much to the TriggerDagRunOperator. You can also use a generic PythonOperator for that purpose. 

Max

Ludovic Claude

unread,
Apr 18, 2016, 2:13:42 PM4/18/16
to Airflow
Hello,

I'm trying to do something similar here: I need to scan a directory containing sub-directories to process.

I created a python function which scans the directory and then uses TriggerDagRunOperator to start processing on each folder.
It works but launching the execution of TriggerDagRunOperator looks hack-ich . Is there a better way to do that, or should I attempt to write a TriggerMulipleDagRunOperator?

Here is my code:
https://gist.github.com/ludovicc/bb753eb8d2a11e6b042d44a90965d6f5

Another issue, I can see in the log errors like "Duplicate entry 'pre_process_dicom-2016-04-18 17:43:46' for key 'dag_id'". Do you have any idea what is going on here?

Thanks,

Ludovic
Reply all
Reply to author
Forward
0 new messages