How do I branch in a Dag?
I want to test if a file is present.
If it's present the dag should follow one path, otherwise it should follow another path.
I know there's a BranchOperator.
In the example below I'm using a PythonOperator, since I haven't found a conditional BranchOperator example.
Should I use the BranchOperator in this example, if so how do I use it?
Here's the code example, which needs to be corrected.
The function that tests for the file is called flag. It returns True or False, to the PythonOperator.
import airflow
import glob
from datetime import datetime, timedelta
def flag(**kwargs):
f1 = 'one.jar'
tf = False
if glob.glob(f1):
tf = True
return tf
args = {'owner': 'airflow', 'start_date': datetime(2015, 10, 1, 6, 40, 0), 'depends_on_past': False}
dag = airflow.DAG(dag_id='ws_dag_18', schedule_interval=timedelta(hours=1), default_args=args)
start = airflow.operators.PythonOperator(task_id='python_task', provide_context=True, python_callable=flag, dag=dag)
if '{{ ti.xcom_pull("start") }}':
t1 = airflow.operators.DummyOperator(task_id='file_exists', dag=dag)
t1.set_upstream(start)
else:
f1 = airflow.operators.DummyOperator(task_id='file_missing', dag=dag)
f1.set_upstream(start)