from datetime import datetime, timedelta
from airflow import DAG
from ftp import FtpDownload
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 2, 8),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2016, 2, 9),
}
dag = DAG(dag_id='et_import', default_args=default_args)
remote_files = [
'file1_{mm:02d}{dd:02d}{yy:04d}.csv',
'file2_{mm:02d}{dd:02d}{yy:04d}.csv',
'file3_{mm:02d}{dd:02d}{yy:04d}.csv'
]
current = default_args['start_date']
for file in remote_files:
et_get_files = FtpDownload(
dag=dag,
owner='airflow',
task_id='et_get_files',
dest_path='/tmp/',
source_conn_id='ftp_default',
source_path='/Export/',
source_file=file.format(mm=current.month, dd=current.day, yy=current.year))