Script that generate a DAG has to know how to generate the DAG with particular dag_id

1,861 views
Skip to first unread message

Andrey Kartashov

unread,
Oct 1, 2015, 12:30:05 PM10/1/15
to Airflow
Hi,
Lets consider next example:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import PythonOperator
import os

default_args
= {

 
'owner': 'airflow',
 
'start_date': datetime(2015, 9, 25)
}

def f(*args, **kwargs):

 
print('hello!')

td
=datetime.today()
x
=str(datetime(td.year,td.month,td.day,td.hour,td.minute/10))
dag_id
= 'test-dag-%d' % x

d
= DAG(dag_id, default_args=default_args)

t
= PythonOperator(task_id='t1',
dag=d,
python_callable=f)

globals()[dag_id] = d

Every 10 mins it makes a new DAG with new dag_id. 

I've got next problems:

1. Web interface will not visualize DAGs with old dag_ides.  The situation even worse if one of a task was not succeed and execution of a DAG with dag_id was failed and current dag_id is not the same, the web interface does not show DAG with that dag_id anymore and the only way to check what was wrong is database.
2. There is no way to know within this script what dag_id is required and for what function (web, scheduler worker)
3. Even if I create a file with metadata (dag_ides that I created) and will recreate all the DAGs (on each heartbeat) that was created at some point of time the execution of the script will take more than 30sec (10000 DAGs) it will be killed.

Is there any solutions with current version of airflow? Is it possible to add a parameter or global variable that will have the information about what part of airflow is loading (web, scheduler or worker) change hardcopied 30secs into config file or allow the same time as heartbeat?

Maybe I missed something and go in wrong direction.

Thanks,
Andrey

Maxime Beauchemin

unread,
Oct 1, 2015, 12:55:26 PM10/1/15
to Airflow
If you're trying to dynamically generate one new DAG every 10 minutes in theory it's possible, but your script has to generate the DAGs for as long as you want to keep them around for. So you'd need a for loop that goes from first time (static timestamp?) to last time (now?). Your script is really just a dynamic config file. If you stop generating a dag_id, it doesn't exist anymore, as if you'd taken it out of a static config file.

Andrey Kartashov

unread,
Oct 1, 2015, 1:12:03 PM10/1/15
to Airflow
Yep,
I understood that. The problem is that that I do not see a way how to adopt airflow to my pipelines without changing some logic in airflow.
And there are some ways that might help solve it with less pain :) Kind of dag_id to generate if it is web or worker or if I have a special metadata to do that how I can synchronize the script that generates a DAG and particular DAG that it has been successfully finished.

So the only way that I see is the first solution, one DAG with first task monitors a dir for a file/files, proceed one file at the time and sync stages trough xcoms - I can't run more than one DAG at the time, if one of the task is failed than processing of files is stopped till resolution. 


AH

unread,
Oct 1, 2015, 4:05:19 PM10/1/15
to Airflow
Why don't you make it one DAG that has a schedule_interval of 10 minutes? Then if one is stuck the next can still run.

Andrey Kartashov

unread,
Oct 2, 2015, 5:28:37 PM10/2/15
to Airflow
I tried here is the picture of what has happened:


So first task just monitors a directory for a new file. To monitor a file I can do it in two ways redefine execute method and infinite loop till file appears(situation on the picture, and almost the same idea as behind sensors) the other way if there is no file just trough an exception (than first row will almost errors).

I cleaned errors for one of the runs to see if it rerun or not and it is not.

So as I understand only one instance of task with particular task_id and dag_id can be run at the time in scheduler. 

I already understood this logic but it is really confused.

I thought that each DAG execution (column on the picture) has to be independent and run in parallel and actually to start execution it has to be triggered because without that we will have a bunch of light green running tasks bottom row at the same time.

Thanks,
A
Reply all
Reply to author
Forward
0 new messages