Hi,
Lets consider next example:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import PythonOperator
import os
default_args = {
'owner': 'airflow',
'start_date': datetime(2015, 9, 25)
}
def f(*args, **kwargs):
print('hello!')
td=datetime.today()
x=str(datetime(td.year,td.month,td.day,td.hour,td.minute/10))
dag_id = 'test-dag-%d' % x
d = DAG(dag_id, default_args=default_args)
t = PythonOperator(task_id='t1',
dag=d,
python_callable=f)
globals()[dag_id] = d
Every 10 mins it makes a new DAG with new dag_id.
I've got next problems:
1. Web interface will not visualize DAGs with old dag_ides. The situation even worse if one of a task was not succeed and execution of a DAG with dag_id was failed and current dag_id is not the same, the web interface does not show DAG with that dag_id anymore and the only way to check what was wrong is database.
2. There is no way to know within this script what dag_id is required and for what function (web, scheduler worker)
3. Even if I create a file with metadata (dag_ides that I created) and will recreate all the DAGs (on each heartbeat) that was created at some point of time the execution of the script will take more than 30sec (10000 DAGs) it will be killed.
Is there any solutions with current version of airflow? Is it possible to add a parameter or global variable that will have the information about what part of airflow is loading (web, scheduler or worker) change hardcopied 30secs into config file or allow the same time as heartbeat?
Maybe I missed something and go in wrong direction.
Thanks,
Andrey