programmatic invocation of an operator from within another task instance

1,893 views
Skip to first unread message

Daniel Williams

unread,
Jan 10, 2016, 1:12:15 PM1/10/16
to Airflow
Hi,

I've created an operator that is intended to be used often from many workflows; however, in one case I have a PythonOperator that is looping and I then intend to invoke the re-usable operator from within a loop ultimately completing or throwing an AirflowException.   I'm getting an error while executing in this fashion of:

TypeError: execute() got an unexpected keyword argument 'dag_run'

If this isn't possible then what is the airflow idiomatic way of looping inside of the workflow?

Thanks.

dan

Maxime Beauchemin

unread,
Jan 11, 2016, 12:09:07 AM1/11/16
to Airflow
In this case it depends on what you want your "unit of work" to be. You could generate multiple tasks in a for loop, resulting in at least as many tasks in your DAG as the number of iterations in your loop. Within this for loop, you could set these tasks to be chained, parallelized or any other dependency patterns that fit your logic. Airflow encourages and makes it easy to generate tasks dynamically. 

In general operators are created to allow passing a significant amount of work for an external system. As in "run this long Hive script" or "run this Spark job", if in your case you'd have small operations that take a few second, it wouldn't really makes sense to consider that workload a "batch job" and integrating as such in a workflow. Then it's simply a program, and PythonOperator can be used to run little programs. 

Operators typically are self contained and don't call other operators. It would get really confusing in the metadata. The reusable components typically are hooks methods. Generally speaking most of the logic related to operators are in the hooks, and the operator just puts things together. If you have a fair amount of logic in your operator, I'd recommend refactoring that out into a hook where possible. The hooks method can be used by operators, or in a PythonOperator easily.
Reply all
Reply to author
Forward
0 new messages