Assigning parameters to Python variable ???

6,232 views
Skip to first unread message

Chris Peters

unread,
Sep 9, 2015, 9:13:20 AM9/9/15
to Airflow


I would like to assign a parameter to a python variable in the tutorial program. How do I do it?
The command I'm using to run the dag is: airflow.sh backfill -e 2015-08-03 tut001 . The parameter I'm  interested in is -e  which is 2015-08-03

How do I assign it to a regular Python variable (var1) ?
Below is some sample code from the tutorial, where I have included var1, so that it can be seen in context.
I understand that it's somehow related to {{ ds }}.
I would appreciate any help, or suggestions.

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2015, 6, 1),}
dag = DAG('tutorial', default_args=args)
var1 = ???

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(task_id='print_date',bash_command='date', dag=dag)
t2 = BashOperator(task_id='sleep',bash_command='sleep 5',dag=dag)

templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(task_id='templated',bash_command=templated_command,params={'my_param': 'Parameter I passed in'},dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)

Marc Limotte

unread,
Sep 10, 2015, 10:39:21 AM9/10/15
to Airflow
Chris,

If I understand the question right, I think you just want to use a PythonOperator with the `provide_context=True` argument.  Similar to the print_the_context example in example_python_operator.py.  The python_callable function can then specify any named parameters to capture them from kwargs.


marc



Chris Peters

unread,
Sep 14, 2015, 2:22:05 AM9/14/15
to Airflow
Thanks for the reply. I will try it.

I'm hoping there's also a more direct way.
For example I can get the value for start_date like so:

start_date = args.get('start_date')
or
start_date = dag.default_args.get('start_date')

Simply because 'start_date' is a Python dictionary entry.
How, do I get the current 'execution_date'?
I have tried the following:

 execution_date = args.get('execution_date')
 execution_date = TaskInstance.execution_date

I tried the second option, since I found execution_date in the TaskInstance class in the source code.
However, both of the above options resulted in errors.

Any suggestions could be useful?

Marc Limotte

unread,
Sep 14, 2015, 4:36:44 AM9/14/15
to Airflow
execution_date should be available in the kwargs.

execution_date = kwargs.get('execution_date')

I think a more direct way to get it is:

def do_something(ds, execution_date, ti, **kwargs):
    ...

Where do_something is the python callable, and provide_context=True.  In this example, I also capture ds (the execution date as a String), and ti (the task instance object).  The latter is what you would use for your last attempt:
ti.execution_date

Hope that helps,
marc
 

Chris Peters

unread,
Sep 15, 2015, 7:55:44 AM9/15/15
to Airflow
Hi Marc,

Thanks. Great help. It works.
In the code below I'm returning ed = str(kwargs.get('execution_date'))
How do I include the returned ed in a BashOperator command?
Let's say I want to echo ed

Preliminary code

from airflow.operators import BashOperator, DummyOperator, PythonOperator
from airflow.models import DAG

from datetime import datetime, timedelta
from airflow.macros import ds_add

args = {
'owner': 'cgr',
'start_date': datetime(2015, 9, 1),
'depends_on_past': True
}

dag = DAG(dag_id='tst_stats_002', schedule_interval=timedelta(days=1), default_args=args)


def pc(ds, **kwargs):
ed = str(kwargs.get('execution_date'))
print(ds)
return ed

t1 = PythonOperator(task_id='python_task', provide_context=True, python_callable=pc, dag=dag)

# Code that I'm unsure about ***
# I would like to echo ed using a BashOperator ***

c1 = "echo " + ed ? or
c1 = "echo " + ....


# *************************************************


t2 = BashOperator(task_id='tst_stats', bash_command=c1, dag=dag)
t2.set_upstream(t1)

Andrey Kartashov

unread,
Sep 15, 2015, 9:25:49 AM9/15/15
to Airflow
The only way to transfer data between tasks is Coms http://pythonhosted.org/airflow/concepts.html#xcoms

Craig Kimerer

unread,
Sep 15, 2015, 3:36:17 PM9/15/15
to Airflow
Without testing, and only looking at code, I would assume something like --

c1 = "echo {{ execution_date }}" 

Would give you the output you wanted.

Craig

PS - Sorry Andrey for the double post, just realized the list doesn't allow posting through reply in email.

Maxime Beauchemin

unread,
Sep 16, 2015, 12:41:07 PM9/16/15
to Airflow
I just added an entry in the docs to clarify some things that may be relevant to this thread:

Craig Kimerer

unread,
Sep 16, 2015, 1:31:59 PM9/16/15
to Airflow
Max,

I think the bigger problem with the misunderstanding is that this document [1] is that it's not clear what kwargs can be templated and what variables are available within the templating itself.  I think which kwargs that are templated can be defined in [2], but I don't know where a good place for the documentation of the variables accessible for templating are.

Maxime Beauchemin

unread,
Sep 16, 2015, 2:00:06 PM9/16/15
to Airflow
Here's the list of objects available in templates, in the operator's execute method, and in the PythonOperator when using `provide_context`

Reply all
Reply to author
Forward
0 new messages