Operators return value

573 views
Skip to first unread message

Andrey Kartashov

unread,
Sep 15, 2015, 3:35:29 PM9/15/15
to Airflow
The more I read the code the more I understand :)

So when task is instantiated the TaskInstance is created and method run is run if all requirements passed we reach next code

                    result = None                                                                                                                                                                       
                    if task_copy.execution_timeout:                                                                                                                                                     
                        with utils.timeout(int(                                                                                                                                                         
                                task_copy.execution_timeout.total_seconds())):                                                                                                                          
                            result = task_copy.execute(context=context)                                                                                                                                 
                                                                                                                                                                                                        
                    else:                                                                                                                                                                               
                        result = task_copy.execute(context=context)                                                                                                                                     
                                                                                                                                                                                                        
                    # If the task returns a result, push an XCom containing it                                                                                                                          
                    if result is not None:                                                                                                                                                              
                        self.xcom_push(key=XCOM_RETURN_KEY, value=result)      

I more interested if execute return a value and it pushed into XComs why we do not have them pulled into context downstream?

Maxime Beauchemin

unread,
Sep 15, 2015, 5:33:37 PM9/15/15
to Airflow
You can easily pull them in the context downstream.

from memory, you'll have to lookup the value of XCOM_RETURN_KEY:
task_instance.xcom_pull(task_id='other_task_id', key=XCOM_RETURN_KEY)

Andrey Kartashov

unread,
Sep 15, 2015, 5:41:32 PM9/15/15
to Airflow
I can if it's a PythonOperator if not...
I think I have to make a new operator and them probably a can use pre_execute to populate this values into context. I think I can transfer info, like temp directory, within current run of DAG between tasks.

Maxime Beauchemin

unread,
Sep 15, 2015, 8:50:05 PM9/15/15
to Airflow
You can also read from templates in templated fields
sql = """
SELECT * FROM {{ task_instance.xcom_pull() }}
"""
Reply all
Reply to author
Forward
0 new messages