Using execution_date in python code for assigning variable

2,261 views
Skip to first unread message

r0ger

unread,
Apr 19, 2016, 6:35:16 PM4/19/16
to Airflow
I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company.  Sorry if this question sounds really dumb.

I am writing a pipeline using bunch of BashOperators. 
Basically, for each Task, I want to simply call a REST api using 'curl' 

This is what my pipeline looks like(very simplified version):

 
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime
                                  
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xx...@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)


If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal())
Instead what I want here is 'execution_date' 

How do I use 'execution_date' directly and assign it to a variable in my python file? 

I have having this general issue of accessing args. 
Any help will be genuinely appreciated.


Thanks

Serega Sheypak

unread,
Apr 20, 2016, 5:38:24 AM4/20/16
to Airflow
Have you seen it: http://pythonhosted.org/airflow/code.html#macros ?

Did you try:

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+{{ execution_date }} +'" '

t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

r0ger

unread,
Apr 20, 2016, 11:46:34 AM4/20/16
to Airflow
I did try that as well. I get an error: NameError: name 'execution_date' is not defined


from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime
                                  
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xx...@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


curl_cmd='curl -XPOST "'+':8000/run?st='+{{ execution_date }} +'" '
print curl_cmd

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)


Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 209, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/Users/harishs/airflow/dags/test_pipeline.py", line 56, in <module>
    curl_cmd='curl -XPOST "'+':8000/run?st='+{{ execution_date }} +'" '
NameError: name 'execution_date' is not defined

Reply all
Reply to author
Forward
0 new messages