Hi,
First of all, thank you to all the contributors for making this tool open.
I'm not sure if I'm approaching my problem the right way:
I want to schedule many tasks in one DAG, but have them started at different intervals, say every minute, 2 minutes, 5 minutes.
The reason I want to group them in one dag is simply because they relate to the same type of tasks.
I've been trying to do it a bit like this:
default_args = {
'owner': 'c-a',
'depends_on_past': False,
'start_date': datetime.now()-timedelta(minutes=2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
}
dagInterval = 2
sensorInterval = 1
dag = DAG('testDag', default_args=default_args, schedule_interval=timedelta(minutes=dagInterval))
sensor = TimeDeltaSensor(delta=timedelta(minutes=sensorInterval), task_id="tdSensor", dag=dag)
t1 = BashOperator(
task_id='testTask',
bash_command='echo "testTask"',
dag=dag,
)
t1.set_upstream(sensor)
Doc for the TimeDeltaSensor class, in the code:
Waits for a timedelta after the task's execution_date + schedule_interval.
In Airflow, the daily task stamped with ``execution_date``
2016-01-01 can only start running on 2016-01-02. The timedelta here
represents the time after the execution period has closed.
I'm not sure whether the finest granularity should be the timedeltasensor or the dag's schedule_interval, I've tried both, and the sensor never is successful, it's always running, and never triggering anything under it.
From my understanding t1 should have launched in one of the two case (i.e. dagInterval=2 && sensorInterval=1 OR dagInterval=1 && sensorInterval=2 ).
It's probably a little something I didn't see, but any help will be greatly appreciated at this point, thank you!