Is it possible to send parameters to a workflow?

13,511 views
Skip to first unread message

Sergei Iakhnin

unread,
Nov 9, 2015, 3:05:17 PM11/9/15
to Airflow
Hi,

I hope you don't feel like I'm spamming your board but there's a bit of a learning curve with this tool and information is somewhat to come by.

So, if I want to kick off a bunch of workflow instances, each with a slightly different set of parameters, how do I do it?

Thanks in advance,

Sergei.

Sergei Iakhnin

unread,
Jan 25, 2016, 10:54:53 AM1/25/16
to Airflow
Hi,

I never did get a response to this. Is this possible? I want to set up some configuration values that can vary from run to run and then kick off a dag run, and have a task instance be able to read those configuration values.

Thanks in advance,

Sergei.

Maxime Beauchemin

unread,
Jan 25, 2016, 12:35:34 PM1/25/16
to Airflow
The DagRun object has room for a `conf` parameter that gets exposed in the "context" (templates, operators, ...). That is the place where you would associate parameters to a specific run. For now this is only possible in the context of an externally triggered DAG run. The way the `TriggerDagRunOperator` works, you can fill in the conf param during the execution of the callable that you pass to the operator.

If you are looking to change the shape of your DAG through parameters, we recommend doing that using "singleton" DAGs (using a "@once" `schedule_interval`), meaning that you would write a Python program that generates multiple dag_ids, one of each run, probably based on metadata stored in a config file or elsewhere.

The idea is that if you use parameters to alter the shape of your DAG, you break some of the assumptions around continuity of the schedule. Things like visualizing the tree view or how to perform a backfill becomes unclear and mushy. So if the shape of your DAG changes radically based on parameters, we consider those to be different DAGs, and you generate each one in your pipeline file.

Max

Sergei Iakhnin

unread,
Jan 26, 2016, 8:48:14 AM1/26/16
to Airflow
Thanks for this tip Maxime. I'm having some issues getting a DAGRun triggered this way. I have a very simple test DAG.

from airflow import DAG

from airflow.operators import BashOperator, PythonOperator

from datetime import datetime, timedelta

import datetime

import os

import logging


import tracker.model


from subprocess import call


logger = logging.getLogger()



def run_my_test(**kwargs):

    logger.info(kwargs["run_id"])


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': datetime.datetime(2020, 01, 01),

    'email': ['airflow@airflow.com'],

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 1,

    'retry_delay': timedelta(minutes=5),

}


dag = DAG("test-dag", default_args=default_args,

          schedule_interval=None, concurrency=10000, max_active_runs=2000)


test_task = PythonOperator(

    task_id="run_my_test",

    python_callable=run_my_test,

    op_kwargs={"test_param": "test"},

    provide_context=True,

    priority_weight=10,

    dag=dag)


I then have a separate test script for kicking off the DAG:


from airflow.operators import TriggerDagRunOperator



def set_up_dag_run(context, dag_run_obj):

    dag_run_obj.payload = {"my_payload": "blabla"}


my_dag_run = TriggerDagRunOperator(dag_id="test-dag", python_callable=set_up_dag_run)



When run this dies with the following error:


python trigger_dag.py 

[2016-01-26 14:28:34,076] {__init__.py:36} INFO - Using executor SequentialExecutor

Traceback (most recent call last):

  File "trigger_dag.py", line 7, in <module>

    my_dag_run = TriggerDagRunOperator(dag_id="test-dag", python_callable=set_up_dag_run)

  File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, in wrapper

    result = func(*args, **kwargs)

  File "/usr/local/lib/python2.7/site-packages/airflow/operators/dagrun_operator.py", line 41, in __init__

    super(TriggerDagRunOperator, self).__init__(*args, **kwargs)

  File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 457, in wrapper

    raise AirflowException(msg)

airflow.utils.AirflowException: Argument ['owner', 'task_id'] is required


The issue seems to be that some default_args are missing, but this happens very early on in the execution, basically when the BaseOperator __init__ method is invoked, thus no DAG specific default_args have been read in yet.


Am I wrong in using the TriggerDAGRunOperator in this way? Can it be used in this standalone fashion or must it be used in the context of another active DAGRun and hence the complaints about missing default args? 

Sergei Iakhnin

unread,
Jan 26, 2016, 10:20:19 AM1/26/16
to Airflow
Well, I was able to figure out how to make it work. 

The callable needs to actually return the dag_run_obj because of the execute() method in dagrun_operator.py

    def execute(self, context):

        dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())

        dro = self.python_callable(context, dro)

        if dro:

            session = settings.Session()

            dr = DagRun(

                dag_id=self.dag_id,

                run_id=dro.run_id,

                conf=dro.payload,

                external_trigger=True)

            logging.info("Creating DagRun {}".format(dr))

            session.add(dr)

            session.commit()

            session.close()

        else:

            logging.info("Criteria not met, moving on")


These lines, btw, are kind of weird to me:

dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())

dro = self.python_callable(context, dro)


The dro variable is set to a newly created object and then immediately to the dag_run_obj that gets returned from the user-implemented callable. What's the point of creating that first object?

Maxime Beauchemin

unread,
Jan 26, 2016, 11:14:04 AM1/26/16
to Airflow
Related, a bug that was fixed recently in TriggerDagRunOperator:

Sergei Iakhnin

unread,
Jan 27, 2016, 10:07:50 AM1/27/16
to Airflow
Ok, so when I try to launch these in a loop there is a strange error when the second DAGRun is about to be created:

sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"

DETAIL:  Key (dag_id, execution_date)=(test-dag, 2016-01-27 15:52:41.210462) already exists.

 [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters: {'end_date': None, 'run_id': 'e7712eb5-32c1-44a5-9439-dd6d37739f71', 'execution_date': datetime.datetime(2016, 1, 27, 15, 52, 41, 210462), 'external_trigger': True, 'state': u'running', 'conf': <psycopg2.extensions.Binary object at 0x11351fa30>, 'start_date': datetime.datetime(2016, 1, 27, 15, 52, 41, 210589), 'dag_id': 'test-dag'}]


Basically the execution_date for both runs appears to be identical even though they are created at different times (I even put a 10 second sleep timer between loop iterations). When I step through the DagRun creation it looks like execution_date is not set for a DagRun until session.commit() is called on line 57 of dagrun_operator.py. At that point it is set to what I assume is the default value per line 2720 of models.py


execution_date = Column(DateTime, default=datetime.now())


but for both instances of DagRun that are created this variable gets set to the exact same value, even though they are created minutes apart! Is this some SQLAlchemy treachery? I tried on both SQLite and Postgres backends.



 

Sergei Iakhnin

unread,
Jan 27, 2016, 10:18:49 AM1/27/16
to Airflow
I was able to avoid this error by explicitly setting execution_date during DagRun creation in dagrun_operator.execute() like so

            dr = DagRun(

                dag_id=self.dag_id,

                run_id=dro.run_id,

                conf=dro.payload,

                external_trigger=True,

                execution_date = datetime.now())



Should I PR?

Maxime Beauchemin

unread,
Jan 27, 2016, 12:22:32 PM1/27/16
to Airflow
I think there's already a fix for this in master:

Please confirm that it fixes your issue!

Sergei Iakhnin

unread,
Jan 28, 2016, 6:54:57 AM1/28/16
to Airflow
Yep, that works.

Serega Sheypak

unread,
Apr 15, 2016, 11:12:04 AM4/15/16
to Airflow
Hi Sergei!
How do you pass --conf "{'my':'value'}" to your dag from command line? 

Frank Wang

unread,
Apr 21, 2016, 6:34:07 PM4/21/16
to Airflow

airflow trigger_dag your_dag_name --conf '{"input_path":"..........." }'

Serega Sheypak

unread,
Apr 22, 2016, 4:48:53 AM4/22/16
to Airflow
Thanks, but it doesn't work if you are not in UTC
Reply all
Reply to author
Forward
0 new messages