Returning and accessing multiple values to PythonOperator with xcom?

6,624 views
Skip to first unread message

Chris Peters

unread,
Sep 28, 2015, 5:35:24 AM9/28/15
to Airflow

How do I return multiple values with xcom?

1. In the code example below the push function is returning h1 and h2 to the function call from the PythonOperator (t1)
2. Can I return multiple values?
3. If so how do I refer to h1 and h2?   I'm currently using

'echo {{ ti.xcom_pull("t1.h1") }} '
in the t2 BashOperator and


'echo {{ ti.xcom_pull("t1.h2") }} ' in the t3 BashOperator

What is the correct answer?





import airflow
import time
from datetime import datetime, timedelta


def push(**kwargs):
h = ((int(time.time())/3600) * 3600) - 3600
h1 = h - 3600
h2 = h + 3600
return h1, h2

args = {'owner': 'airflow', 'start_date': datetime(2015, 9, 25, 6, 40, 0), 'depends_on_past': False}
dag = airflow.DAG(dag_id='example_xcom_2', schedule_interval=timedelta(hours=1), default_args=args)

start = airflow.operators.DummyOperator(task_id='webstats_start', dag=dag)

t1 = airflow.operators.PythonOperator(task_id='t1', provide_context=True, python_callable=push, dag=dag)
t1.set_upstream(start)

t2 = airflow.operators.BashOperator(task_id='t2', bash_command='echo {{ ti.xcom_pull("t1.h1") }} ', dag=dag)
t2.set_upstream(t1)

t3 = airflow.operators.BashOperator(task_id='t3', bash_command='echo {{ ti.xcom_pull("t1.h2") }} ', dag=dag)
t3.set_upstream(t2)

stop = airflow.operators.DummyOperator(task_id='webstats_complete', dag=dag)
stop.set_upstream(t3)


Maxime Beauchemin

unread,
Sep 28, 2015, 2:11:58 PM9/28/15
to Airflow
Looks like your function is returning a tuple, so xcom_pull will return that tuple. You can use the index to refer to the position you want. You can also use `collections.namedtuple`  for more pythonic code :)

'echo {{ ti.xcom_pull("t1")[0] }} '


Max

Chris Peters

unread,
Sep 30, 2015, 7:39:13 AM9/30/15
to Airflow
Thanks, it worked.
Reply all
Reply to author
Forward
0 new messages