airflow.utils.AirflowException: dag_id could not be found

3,463 views
Skip to first unread message

Brenda Bell

unread,
Feb 17, 2016, 1:30:22 PM2/17/16
to Airflow
I'm currently trying to learn airflow in an attempt to get off oozie. I've started my first DAG, but can't seem to get past the error in the subject.

My code looks like this:

from datetime import datetime, timedelta
from airflow import DAG
from ftp import FtpDownload


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 2, 8),
    'email': ['m...@example.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    'end_date': datetime(2016, 2, 9),
}

dag = DAG(dag_id='et_import', default_args=default_args)

remote_files = [
    'file1_{mm:02d}{dd:02d}{yy:04d}.csv',
    'file2_{mm:02d}{dd:02d}{yy:04d}.csv',
    'file3_{mm:02d}{dd:02d}{yy:04d}.csv'
]

current = default_args['start_date']

for file in remote_files:
    et_get_files = FtpDownload(
        dag=dag,
        owner='airflow',
        task_id='et_get_files',
        dest_path='/tmp/',
        source_conn_id='ftp_default',
        source_path='/Export/',
        source_file=file.format(mm=current.month, dd=current.day, yy=current.year))



My other issue is that I can't figure out how to get the current execution (instance) date from my DAG. Maybe I should move that logic to my FtpDownload operator and get it from kwargs?

I'm running my DAG with the following command, if it matters:

airflow test et_import et_get_files 2015-02-08

Brenda


Brenda Bell

unread,
Feb 17, 2016, 2:16:30 PM2/17/16
to Airflow
Disregard. I found the issue.

There were actually two stack traces -- one for an error in my code followed by the illusive dag_id error. The latter went away after I addressed the first.
Reply all
Reply to author
Forward
0 new messages