from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from datetime import datetime, timedelta
import datetime
import os
import logging
import tracker.model
from subprocess import call
logger = logging.getLogger()
def run_my_test(**kwargs):
logger.info(kwargs["run_id"])
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 01, 01),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG("test-dag", default_args=default_args,
schedule_interval=None, concurrency=10000, max_active_runs=2000)
test_task = PythonOperator(
task_id="run_my_test",
python_callable=run_my_test,
op_kwargs={"test_param": "test"},
provide_context=True,
priority_weight=10,
dag=dag)
I then have a separate test script for kicking off the DAG:
from airflow.operators import TriggerDagRunOperator
def set_up_dag_run(context, dag_run_obj):
dag_run_obj.payload = {"my_payload": "blabla"}
my_dag_run = TriggerDagRunOperator(dag_id="test-dag", python_callable=set_up_dag_run)
When run this dies with the following error:
python trigger_dag.py
[2016-01-26 14:28:34,076] {__init__.py:36} INFO - Using executor SequentialExecutor
Traceback (most recent call last):
File "trigger_dag.py", line 7, in <module>
my_dag_run = TriggerDagRunOperator(dag_id="test-dag", python_callable=set_up_dag_run)
File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/operators/dagrun_operator.py", line 41, in __init__
super(TriggerDagRunOperator, self).__init__(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 457, in wrapper
raise AirflowException(msg)
airflow.utils.AirflowException: Argument ['owner', 'task_id'] is required
The issue seems to be that some default_args are missing, but this happens very early on in the execution, basically when the BaseOperator __init__ method is invoked, thus no DAG specific default_args have been read in yet.
Am I wrong in using the TriggerDAGRunOperator in this way? Can it be used in this standalone fashion or must it be used in the context of another active DAGRun and hence the complaints about missing default args?
def execute(self, context):
dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dr = DagRun(
dag_id=self.dag_id,
run_id=dro.run_id,
conf=dro.payload,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
session.close()
else:
logging.info("Criteria not met, moving on")
dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())
dro = self.python_callable(context, dro)
The dro variable is set to a newly created object and then immediately to the dag_run_obj that gets returned from the user-implemented callable. What's the point of creating that first object?
sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"
DETAIL: Key (dag_id, execution_date)=(test-dag, 2016-01-27 15:52:41.210462) already exists.
[SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters: {'end_date': None, 'run_id': 'e7712eb5-32c1-44a5-9439-dd6d37739f71', 'execution_date': datetime.datetime(2016, 1, 27, 15, 52, 41, 210462), 'external_trigger': True, 'state': u'running', 'conf': <psycopg2.extensions.Binary object at 0x11351fa30>, 'start_date': datetime.datetime(2016, 1, 27, 15, 52, 41, 210589), 'dag_id': 'test-dag'}]
Basically the execution_date for both runs appears to be identical even though they are created at different times (I even put a 10 second sleep timer between loop iterations). When I step through the DagRun creation it looks like execution_date is not set for a DagRun until session.commit() is called on line 57 of dagrun_operator.py. At that point it is set to what I assume is the default value per line 2720 of models.py
execution_date = Column(DateTime, default=datetime.now())
but for both instances of DagRun that are created this variable gets set to the exact same value, even though they are created minutes apart! Is this some SQLAlchemy treachery? I tried on both SQLite and Postgres backends.
dr = DagRun(
dag_id=self.dag_id,
run_id=dro.run_id,
conf=dro.payload,
external_trigger=True,
execution_date = datetime.now())
Should I PR?
airflow trigger_dag your_dag_name --conf '{"input_path":"..........." }'