DAGs' Scheduling granularity, TimeDeltaSensor and schedule_interval

2,185 views
Skip to first unread message

Charles-Antoine Dupont

unread,
Oct 21, 2015, 10:02:43 AM10/21/15
to Airflow
Hi,

First of all, thank you to all the contributors for making this tool open.

I'm not sure if I'm approaching my problem the right way:
I want to schedule many tasks in one DAG, but have them started at different intervals, say every minute, 2 minutes, 5 minutes.
The reason I want to group them in one dag is simply because they relate to the same type of tasks.

I've been trying to do it a bit like this:

default_args = {
    'owner': 'c-a',
    'depends_on_past': False,
    'start_date': datetime.now()-timedelta(minutes=2),
    'email': ['air...@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
}

dagInterval = 2
sensorInterval = 1 
dag = DAG('testDag', default_args=default_args, schedule_interval=timedelta(minutes=dagInterval))
sensor = TimeDeltaSensor(delta=timedelta(minutes=sensorInterval), task_id="tdSensor", dag=dag)

t1 = BashOperator(
    task_id='testTask',
    bash_command='echo "testTask"',
    dag=dag,
)

t1.set_upstream(sensor)
 


Doc for the TimeDeltaSensor class, in the code:
Waits for a timedelta after the task's execution_date + schedule_interval.
In Airflow, the daily task stamped with ``execution_date``
2016-01-01 can only start running on 2016-01-02. The timedelta here
represents the time after the execution period has closed.

I'm not sure whether the finest granularity should be the timedeltasensor or the dag's schedule_interval, I've tried both, and the sensor never is successful, it's always running, and never triggering anything under it.
From my understanding t1 should have launched in one of the two case (i.e. dagInterval=2 && sensorInterval=1 OR dagInterval=1 && sensorInterval=2 ).


It's probably a little something I didn't see, but any help will be greatly appreciated at this point, thank you!

Maxime Beauchemin

unread,
Oct 21, 2015, 3:44:42 PM10/21/15
to Airflow
sensor = TimeDeltaSensor(delta=timedelta(minutes=sensorInterval), task_id="tdSensor", dag=dag)
should be
sensor = TimeDeltaSensor(execution_delta=timedelta(minutes=sensorInterval), task_id="tdSensor", dag=dag)

Otherwise it defaults to one day.

Charles-Antoine Dupont

unread,
Oct 21, 2015, 4:36:16 PM10/21/15
to Airflow
It seems like delta is the correct name though:

In any case, when I substitute delta for execution_delta, I get
    result = func(*args, **kwargs)
TypeError: __init__() takes at least 2 arguments (1 given)


I'm still testing and trying to figure out where this is coming from.

Just so that it is clear, from my understanding, the TimeDeltaSensor could be set to 30 minutes interval on a daily schedule_interval (on the DAG).
This would create a task that would run every 30minutes after the initial trigger whose first run would be DAG start date + DAG schedule interval + TimeDeltaSensor's interval, correct?
I would also expect any task downstream of the sensor to be triggered accordingly (e.g. if trigger rule is set to all success on downstreams) every subsequent ~30 minutes, correct?
(I just want to make sure I have the correct assumptions to start with :) )

Thanks again!

Maxime Beauchemin

unread,
Oct 21, 2015, 5:23:10 PM10/21/15
to Airflow
Sorry about the confusion, `delta` is the right argument.

The TimeDeltaSensor you are describing would run once a day, and would start "sensing" soon after midnight on the `2015-01-02` for the `2015-01-01` execution_date. It would sense, or appears as running for 30 minutes, and then would succeed, allowing the downstream tasks to proceed.

Charles-Antoine Dupont

unread,
Oct 21, 2015, 6:17:04 PM10/21/15
to Airflow
Thanks for the clarification, that's not at all how I perceived the TimeDeltaSensor.

I believe it answers my underlying question that I had not been able to formulate, but just to make sure :) 

No task will ever run faster (more often) than what is defined by its DAG's schedule_interval, correct?
i.e. the "launch signal" can be slowed down (e.g. time delta sensor) within the DAG, but never repeated, so no task can successfully run twice during the same interval. (unless we clear tasks... but that should not be part of a standard workflow from my understanding)

Maxime Beauchemin

unread,
Oct 22, 2015, 8:07:12 PM10/22/15
to Airflow
Exactly, the schedule_interval defines the atomicity as far as time goes. Tasks get instantiated once and only once per schedule interval.

Well that's until I merge the PR that revamps how the scheduler works. We'll allow to define a schedule_interval as a cron compatible expression (as in "0 0 * * *"), and allow for externally triggered "DAG Runs", for DAGs that get instantiated "on demand". I'll write a post about what is shaping up.

Max
Reply all
Reply to author
Forward
0 new messages