Using xcom in BashOperators?

6,734 views
Skip to first unread message

Chris Peters

unread,
Sep 24, 2015, 8:22:19 AM9/24/15
to Airflow

How do I use value returned in PythonOperator in a BashOperator?

1. I'm returning LAST_CALL in the PythonOperator t1
2. I would like to use the returned code in the BashOperator t2
3. What should unknown_code in this line of code look like?     

t2 = BashOperator(task_id='t2', bash_command='echo ' + unknown_code , dag=dag)


Complete code example:

from airflow.operators import BashOperator, DummyOperator, PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import time

def pc(ds, **kwargs):
HOUR = ((int(time.time())/3600) * 3600) - 3600
LAST_CALL = str(HOUR - 900)
print(LAST_CALL)
return LAST_CALL


args = {'owner': 'cgr', 'start_date': datetime(2015, 9, 24, 9, 40, 0), 'depends_on_past': False}
dag = DAG(dag_id='ws_xcom_001', schedule_interval=timedelta(hours=1), default_args=args)

start = DummyOperator(task_id='webstats_start', dag=dag)

t1 = PythonOperator(task_id='t1', provide_context=True, python_callable=pc, dag=dag)
t1.set_upstream(start)

t2 = BashOperator(task_id='t2', bash_command='echo ' +
unknown_code, dag=dag)
t2.set_upstream(t1)

stop = DummyOperator(task_id='webstats_complete', dag=dag)
stop.set_upstream(t2)





Chris Peters

unread,
Sep 24, 2015, 8:28:13 AM9/24/15
to Airflow
The unknown_code must equal LAST_CALL

Maxime Beauchemin

unread,
Sep 24, 2015, 12:58:52 PM9/24/15
to Airflow
I haven't tested but it should be as simple as this:

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.pull_xom("t1") }}"', dag=dag)

If you feel like a warrior please add this example to the docs!

Max

Chris Peters

unread,
Sep 25, 2015, 7:31:45 AM9/25/15
to Airflow

Thanks.
I was able to use the code after changing pull_xom to xcom_pull.

Here's the complete code:

from airflow.operators import BashOperator, DummyOperator, PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import time

def pc(ds, **kwargs):
HOUR = ((int(time.time())/3600) * 3600) - 3600
LAST_CALL = str(HOUR - 900)
print(LAST_CALL)
return LAST_CALL


args = {'owner': 'airflow', 'start_date': datetime(2015, 9, 25, 10, 40, 0), 'depends_on_past': False}

dag = DAG(dag_id='ws_xcom_001', schedule_interval=timedelta(hours=1), default_args=args)

start = DummyOperator(task_id='webstats_start', dag=dag)

t1 = PythonOperator(task_id='t1', provide_context=True, python_callable=pc, dag=dag)
t1.set_upstream(start)

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag)
Reply all
Reply to author
Forward
0 new messages