passing parameters to externally trigged dag

24,291 views
Skip to first unread message

Yuri Titov Bendana

unread,
Mar 8, 2016, 3:04:28 PM3/8/16
to Airflow
How can I pass parameter values to an externally triggered dag?  I'd like to execute a workflow like this simplified version, but the environment variable I set is not inherited by the dag environment.

[bash]$ export FOO=bar; airflow trigger_dag mydag

mydag.py:
from airflow import DAG
from airflow.operators import PythonOperator
from mymodule import mytask
from os import environ

def run_mytask(*args, **kwargs):
   mytask(foo=environ['FOO']) # FOO does not exist

default_args = {
    'owner': 'me',
    'start_date': datetime(2016, 1, 1),
}
dag = DAG('mydag', schedule_interval=None, default_args=default_args)
t1 = PythonOperator(python_callable=run_mytask, provide_context=True, task_id='mytask', dag=dag)


Maxime Beauchemin

unread,
Mar 9, 2016, 8:13:25 AM3/9/16
to Airflow
The branch on master ships with example DAGs that should clarify this.

Yuri Titov Bendana

unread,
Mar 9, 2016, 1:34:35 PM3/9/16
to Airflow
Which example are you referring to?  I've looked at the ones in example_dags and the example_trigger_controller_dag.py is related but not the scenario I'm writing about.  I don't need to have a dag trigger another dag.  I want to trigger a dag directly from the command line and pass parameters to it.  Or is this not possible?  In example_trigger_controller_dag the params dict is static and it doesn't read the values in from anywhere outside the dag.  In order for the params to vary from run to run, does the dag have to read the parameters from a file?  Or do people create a templated trigger dag for every run?

Sergei Iakhnin

unread,
Mar 9, 2016, 2:44:53 PM3/9/16
to Airflow
You can't do it from the command-line at this time but you can do it from Python. This should help


Regards,

Sergei.

Maxime Beauchemin

unread,
Mar 9, 2016, 10:15:52 PM3/9/16
to Airflow

Yuri Titov Bendana

unread,
Mar 10, 2016, 8:49:57 PM3/10/16
to Airflow
Thanks Sergei and Max, this works great.  What I was missing was that you can instantiate a TriggerDagOperator without setting a dag and you can fire it off with TriggerDagOperator.execute(context). Also you need the latest dagrun_operator.py from master.

For anyone else who wants to do the same, here's my updated example:

triggerdag.py:
from airflow.operators import TriggerDagRunOperator

def set_up_dag_run(context, dag_run_obj):
    # The payload will be available in target dag context as kwargs['dag_run'].conf
    dag_run_obj.payload = context
    return dag_run_obj

my_dag_run = TriggerDagRunOperator(trigger_dag_id="mydag", python_callable=set_up_dag_run,
    task_id='trigger_mydag', owner='airflow')
my_dag_run.execute({'foo': 'bar'})


mydag.py:
from airflow import DAG
from airflow.operators import PythonOperator
from mymodule import mytask
from os import environ

def run_mytask(*args, **kwargs):
   mytask(foo=kwargs['dag_run'].conf['foo'])

default_args = {
    'owner': 'airflow',

Frank Wang

unread,
Mar 17, 2016, 1:02:20 AM3/17/16
to Airflow
As Maxime pointed out, you can do trigger_dag with --conf parameters now. but I found it that you have to get this code by pip install from github, not by pip install airflow now

Serega Sheypak

unread,
Apr 18, 2016, 5:12:55 AM4/18/16
to Airflow
What is the right way to access value of "--conf" parameter in DAG?

Maxime Beauchemin

unread,
Apr 18, 2016, 11:54:55 AM4/18/16
to Airflow
The `dag_run` object is availabe in the "context" as listed here:

That means that `{{ dag_run.conf['foo'] }}` is available in templates, and that it is made available in PythonOperator and other Operators's execute method. 

Max

Serega Sheypak

unread,
Apr 18, 2016, 3:27:51 PM4/18/16
to Airflow
Oh, thank you so much for your reply. I didn't try this case. Basically, I didn't understand what dag_run means. 
Is it a kind of implicit object which you can access inside your DAG_something.py?
What is the right way to pass parameters for Backfill jobs? --conf is specified for trigger_dag only. BackfillJob class even don't have a place for something like --conf. 
There are several things that I need to configure in my DAG
1. Artifact version. I have ScaldingOperator and it is associated with exact com.mycompany:super-scalding-job:${version}, where ${version} is a version of scalding job to run. I want to pass XYZ from outside.
2. various settings. I have custom HiveOperator and it depends on some configuration settings and com.mycompany:super-hive-query:${version} 

I need to pass settings and artifact versions as parameters. 
Will it work? 

Serega Sheypak

unread,
Apr 21, 2016, 10:05:08 AM4/21/16
to Airflow
It didn't work for me:

jinja2.exceptions.UndefinedError: 'None' has no attribute 'conf'


My expression is: input_path="{{ dag_run.conf['input_path'] }}",

Message has been deleted
Message has been deleted

r0ger

unread,
Apr 22, 2016, 1:06:16 PM4/22/16
to Airflow
This:
 What is the right way to pass parameters for Backfill jobs? --conf is specified for trigger_dag only. BackfillJob class even don't have a place for something like --conf

I am facing a similar challenge.

Our workflow creates dags dynamically by iterating over some List it reads from a configuration file. 


But during backfill, I don't want this List to populate by reading configuration file. But instead, I want to pass some parameters through command line. 

I am not sure if/how can we do that. 
Some help please? 


example:

my_list = (Obtain through params on command line) OR ( Go read our configuration file )   ##
my_list = {1,2,3}

for i in my_list:
dagname=
'test'+'_'+i
dag = DAG(dagname,
default_args=default_args, schedule_interval=default_schedule_interval)
globals()[dagname]=dag

## Test Task
    t1 = BashOperator(
task_id='test task',
bash_command="echo Hello, World!"
retries=3,
dag=dag)

r0ger

unread,
Apr 22, 2016, 1:31:48 PM4/22/16
to Airflow
On second thoughts, to do the backfill, atleast in mycase, 

I can get the dag_id from UI. Then I can do a backfill. 
Will that work? I tried this approach. But I want to know is there something I should be aware of ? 


Yuri Titov Bendana

unread,
Apr 25, 2016, 9:20:52 PM4/25/16
to Airflow
You can get the dag_run from the kwargs:

def run_mytask(*args, **kwargs):
   dag_run = kwargs['dag_run']
Reply all
Reply to author
Forward
0 new messages