Conditional BranchOperator?

2,787 views
Skip to first unread message

Chris Peters

unread,
Sep 30, 2015, 7:47:20 AM9/30/15
to Airflow

How do I branch in a Dag?

I want to test if a file is present.
If it's present the dag should follow one path, otherwise it should follow another path.
I know there's a BranchOperator.
In the example below I'm using a PythonOperator, since I haven't found a conditional BranchOperator example.
Should I use the BranchOperator in this example, if so how do I use it?
Here's the code example, which needs to be corrected.

The function that tests for the file is called flag. It returns True or False, to the PythonOperator.


import airflow
import glob
from datetime import datetime, timedelta

def flag(**kwargs):
f1 = 'one.jar'
tf = False
if glob.glob(f1):
tf = True
return tf

args = {'owner': 'airflow', 'start_date': datetime(2015, 10, 1, 6, 40, 0), 'depends_on_past': False}
dag = airflow.DAG(dag_id='ws_dag_18', schedule_interval=timedelta(hours=1), default_args=args)

start = airflow.operators.PythonOperator(task_id='python_task', provide_context=True, python_callable=flag, dag=dag)

if '{{ ti.xcom_pull("start") }}':
t1 = airflow.operators.DummyOperator(task_id='file_exists', dag=dag)
t1.set_upstream(start)
else:
f1 = airflow.operators.DummyOperator(task_id='file_missing', dag=dag)
f1.set_upstream(start)

 

Chris Peters

unread,
Sep 30, 2015, 8:50:20 AM9/30/15
to Airflow
I might have found a solution.
The following appears to work correctly, since the dag follows path t, and not path f:

from datetime import datetime, timedelta
import airflow

def pc(**kwargs):
f = 'f'
t = 't'
return t


args = {'owner': 'branch','start_date': datetime(2015, 10, 1, 5, 40, 0), 'depends_on_past': False}

dag = airflow.DAG(dag_id='branch_001', schedule_interval=timedelta(hours=1), default_args=args)

run_this_first = airflow.operators.DummyOperator(task_id='run_this_first', dag=dag)

branching = airflow.operators.BranchPythonOperator(task_id='branching', python_callable=pc, dag=dag)
branching.set_upstream(run_this_first)

t = airflow.operators.DummyOperator(task_id='t', dag=dag)
t.set_upstream(branching)


f = airflow.operators.DummyOperator(task_id='f', dag=dag)
f.set_upstream(branching)


Maxime Beauchemin

unread,
Oct 1, 2015, 1:42:57 AM10/1/15
to Airflow
It seems like this should work. Keep in mind that branching can be part of the solution but that pretty much anything can be solved with a branchless static-ish workflow and tables. There's something nice about setting the data structures (tables) an workflow in stone and having the data be the thing that fluctuates.

Max
Reply all
Reply to author
Forward
0 new messages