Workflow behavior - is it possible to change?

2,811 views
Skip to first unread message

Andrey Kartashov

unread,
Sep 14, 2015, 3:45:10 PM9/14/15
to Airflow
Hi,
As I understand the current or default airflow workflow execution can be described as follow.

Example:
A DAG with 4 steps, S1, S2, S3, S4 and S2 upstream set to S1, S3 to S2 and S4 to S3 so we have a straight forward DAG:
S1->S2->S3->S4
Lets say S1 is a sensor which monitors a directory for a new file/files. 
S2 is taking one oldest file and moving it into a temp directory, so to avoid some conflicts we have to add ait_for_downstream=True into S1
Then S3 does some job with the copied file and S4 clean everything up.

I've put 10 files into that directory S1 and S2 was synchronized that great, because S2 depends not only on S1 but also on previous S2 status (between runs), so if something wrong with 3d file (permissions, i can't move it) I would like to stop pipeline execution. All other steps does not depend in parallel they depend just upstream status. So even the fact that S3 has stuck or failed on run with file 5 I would like parallel S3 to work on file 6, etc and would like to use all the CPU that I have parallelism = 32.

How I can achieve that?

Thanks,
A

Maxime Beauchemin

unread,
Sep 14, 2015, 5:33:10 PM9/14/15
to Airflow
The scheduler doesn't parallelize by design, it assumes a normal forward execution, and execution that doesn't exceed the window provide. This behavior prevents users from triggering way too many jobs by setting for instance a start_date way in the past, or when clearing a large set of task instances.

The `airflow backfill` command does parallelize and will respect your dependencies. 

One remark is that since Airflow assumes a fixed schedule (say hourly, or daily) you may want to execute unit of work that reflect that. Instead of scheduling something that processes one file every 5 minutes, you may want to write tasks that process all the tasks in the folder that run every 1 hour.

Max

Andrey Kartashov

unread,
Sep 14, 2015, 5:56:37 PM9/14/15
to Airflow
I'm sorry still not quite clear. Maybe my understanding of workflows is biased. I have a pipeline that have many steps and organized into a DAG. 

Lets consider simple pipeline: cat "a file"|grep "a filter"|"do something". So we have 3 tasks T1, T2, T3 - cat, grep, do something. So I've made a DAG with 3 tasks it is really complicated to organize this simple pipeline in terms of airflow but possible :) So now I'd like to run this DAG (pipeline) when new file/files is/are available, so if one run of the DAG is failed with one of the file it is not the reason to stop to run other DAGs. So for me the whole DAG is an entity not each task.

So kind of bash example:

function DAG {
cat $1|grep $2|eval $3
}

for i in DIR_NAME; do 
DAG $i "filter" "do something" >ok_${i} 2>bed_${i} &
done

But in airflow I can have all T1 and T2 to be done while T3 is working on first file and if something wrong with T3 and first file I will never move to T3 and second file.

The other problem it is so complicated to forward output from one task to input of next one.

Thanks,
A

Maxime Beauchemin

unread,
Sep 14, 2015, 6:31:47 PM9/14/15
to Airflow
Airflow is a batch workflow engine. It is not meant to stream data across tasks.

Max

Maxime Beauchemin

unread,
Sep 14, 2015, 6:56:37 PM9/14/15
to Airflow
I added a bit of clarifications in the README and docs about what Airflow is not:

Andrey Kartashov

unread,
Sep 14, 2015, 7:07:23 PM9/14/15
to Airflow
I'm sorry I did not want to attract to streaming of data.
The main question is still about actual workflow
Lets consider DAG="echo 1 && echo 2 && bash -c 'exit $(($RANDOM%2))' && echo 4"

Half of the time this pipeline will reach the last step "echo 4" but airflow will stop execution at first $(($RANDOM%2)) equal to 1 and all other installation will go till step 3 until I delete or do mark complete that step.

Maxime Beauchemin

unread,
Sep 14, 2015, 7:52:15 PM9/14/15
to Airflow
Is that step stuck in a running or failed state?

You may want to re-read the docs around wait_for_downstream and the Scheduler section. The behavior might be different from your expectations.

Andrey Kartashov

unread,
Sep 14, 2015, 11:13:08 PM9/14/15
to Airflow
I found the answer to my question:

ti = ti_dict[task.task_id]                                                                                                                                                              
                ti.task = task  # Hacky but worky                                                                                                                                                       
                if ti.state == State.RUNNING:                                                                                                                                                           
                    continue  # Only one task at a time                                                                                                                                                 
                elif ti.state == State.UP_FOR_RETRY:                                                                                                                                                    
                    # If task instance if up for retry, make sure                                                                                                                                       
                    # the retry delay is met                        

Andrey Kartashov

unread,
Sep 14, 2015, 11:27:16 PM9/14/15
to Airflow
So, it means, if I have a bunch amount of files to process I can't do that in parallel by one DAG. 
Let say I have 3 files and 3 tasks in a DAG.
So while S1 do something with first file there will be no other S1 instantiating. When S1 finished and S2 starts doing something airflow will run S1 with second file.
What I would like to have is:
Starting as much as possible S1's in parallel for each file in a folder the rest S1's in queue when S1 finished it queues S2 etc. Means uniq combination for me not task_id but kind of DAG_ID & TASK_ID

Thanks,
A

Maxime Beauchemin

unread,
Sep 15, 2015, 5:19:53 PM9/15/15
to Airflow
This scheduler behavior is by design and documented here:

We could add a "allow_parallel" flag to BaseOperator where you'd specify how many instances of this task are allowed to run in parallel, but the current behavior is preventing users from swamping the message queue when setting an early start_date or clearing large amount of tasks. If you want parallelism currently you have to use the backfill command.

Also, what if you S1 task processed all files in the folder instead of just 1? The idea is to pivot into scheduled unit of work (process all files in the folder every hour) instead of batch unit of work (every 5 minutes check for file and process just one if there are any).

Max

Andrey Kartashov

unread,
Sep 15, 2015, 5:37:54 PM9/15/15
to Airflow
Your idea behind workflow is different from my. I don't want to change something before I fully understand your logic. Now, I probably understand what one can do with airflow.

My understanding is a bit different a have a file that I have to analyze and analysis require many steps. So if I have a bunch of files I would like to parallelize the analysis, but steps of the same analysis has to be connected and if one step fails it means whole analysis fail but I would like fix that particular step and restart analysis from it. Some steps in my analysis are tools with threads but some can use just one thread.

So lets say I will make first step as loop trough all files in dir with analysis and in the middle of the loop something fails I have to rerun analysis means I can loose hours, for instance average mapping to human genome is taking 2-3 hours for one experiment, I can have 10 experiments in a row.

I don't think that having allow_parallel will solve the problem.

I still think that may understanding of workflow systems is not that big to changing something that already work, I still learning and really like airflow it is mature as simple so everyone can adopt it for their own needs.


Maxime Beauchemin

unread,
Sep 15, 2015, 8:48:46 PM9/15/15
to Airflow
I think I understand better your use case now. We'll be working on adding support for what I call "externally triggered DAGs" and "singleton DAG" in the near future.

In the meantime, you may (or may not) want to generate individual DAGs for each instance (what I call singleton DAGs: it is expected to run only one time). You can do that by reading some configuration file(s) or metadata of some kind. If the start_date and end_date of the DAG are identical it will run only once. If you do that you may not need to pass metadata across task, maybe the metadata can live in the config file.

sketch:

for filename is get_filename():
    dag_id = "prefix__" + filename
    dag = DAG(dag_id)
    ....
    globals()[dag_id] = dag

Max

Andrey Kartashov

unread,
Sep 15, 2015, 9:41:48 PM9/15/15
to Airflow
I still thinking about XComs as a better solution. If I save XCom after each step either on_success or post_execute I can recover data at next step an rerun it with correct input, when config file sounds like not a transaction save.

Andrey Kartashov

unread,
Sep 15, 2015, 9:49:40 PM9/15/15
to Airflow
I'm sorry I did not say thank you for you help, but I really appreciate you help and your insights. I just was hard reading your python scripts :) But man, thank you without your suggestions/explanations it would be really tough to understand whole current logic.

I though to add run_id column into task_instance and make the uniq tuple with it. But then has been stuck with first step multiplying, if I have sensor I do not need to fill up whole queue while it waiting for a file.

Andrey Kartashov

unread,
Sep 17, 2015, 11:14:11 AM9/17/15
to Airflow
I read your post about loop trough files. I think now I understood what you have explained to me.

I thought that a DAG is a static thing kind a database scheme once created you use it and add a new record. So DAG with name A has to include all the tasks that has to be done, monitor file/dir, copy file into tmp dir, do steps/analysis, cleanup. 

I have not thought about python script in job directory as dynamic way to add one DAG for one particular job. Heartbeat checks job directory and runs python script which can check SQL/directory what ever and create a one job DAG with particular file name all over the tasks. I can generate new DAG name for each file so they will be run in parallel.

I think it will solve all my requirements and I also probably do not need to pass stdout/stderr between tasks because each step in a DAG is known.

Thanks for a such great open source product,
Andrey

Maxime Beauchemin

unread,
Sep 17, 2015, 4:56:14 PM9/17/15
to Airflow
I'm glad you were able to bend your mind into thinking the Airflow way, and then bending Airflow into making it do what you need... :)

Being able to easily generate tasks and DAGs dynamically is really where Airflow shines over other solutions. Though right now we still think as DAG and task structure as "slowly changing" and on a fixed schedule. As demonstrated, there are hacks around that...


Max

Andrey Kartashov

unread,
Sep 27, 2015, 12:04:41 AM9/27/15
to Airflow
I played and can't succeed. Without loop it works.

for i in range(10,20,1):
    dag_id
='bam-to-bigwig-'+str(i)
   
dag = DAG(dag_id, default_args=default_args)

    cwl_s1
= CWLOperator(
       
cwl_command="bedtools-genomecov.cwl",
       
cwl_base=CWL_BASE,
       
working_dir=WORKING_DIR,
       
cwl_job=yaml.load("""{
        "input": {
            "class": "File",
            "path": "rna.SRR948778.bam"
        },
        "genomeFile": {
            "class": "File",
            "path": "mm10-chrNameLength.txt"
        },
        "scale":1,
        "dept":"-bg",
        "genomecoverageout": "rna.SRR948778.bedGraph"
             }"""),
       
dag=dag)

    cwl_s2
= CWLOperator(
       
cwl_command="linux-sort.cwl",
       
cwl_base=CWL_BASE,
       
working_dir=WORKING_DIR,
       
cwl_job=yaml.load("""{
                 "input": {
                     "class": "File",
                     "path": "rna.SRR948778.bedGraph"
                 },
                 "key": ["1,1","2,2n"],
             }"""),
       
dag=dag)

    cwl_s3
= CWLOperator(
       
cwl_command="ucsc-bedGraphToBigWig.cwl",
       
cwl_base=CWL_BASE,
       
working_dir=WORKING_DIR,
       
cwl_job=yaml.load("""{
        "input": {
            "class": "File",
            "path": "rna.SRR948778.bedGraph.sorted"
        },
        "genomeFile": {
            "class": "File",
            "path": "mm10-chrNameLength.txt"
        },
        "bigWig": "rna.SRR948778.{{ params.i }}.bigWig"
             }"""),
       
params={'i': str(i)},
       
dag=dag)

    cwl_s1
.set_downstream(cwl_s2)
    cwl_s2
.set_downstream(cwl_s3)
   
globals()[dag_id] = dag

The error that I get is:

015-09-26 23:49:22,046 - root - INFO - LocalWorker running airflow run bam-to-bigwig-19 bedtools-genomecov 2015-09-25T00:00:00   --local     -sd DAGS_FOLDER/CWLTest.py  
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 5, in <module>
    pkg_resources.run_script('airflow==1.5.1', 'airflow')
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/pkg_resources.py", line 492, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/pkg_resources.py", line 1350, in run_script
    execfile(script_filename, namespace, namespace)
  File "/Library/Python/2.7/site-packages/airflow-1.5.1-py2.7.egg/EGG-INFO/scripts/airflow", line 10, in <module>
    args.func(args)
  File "/Library/Python/2.7/site-packages/airflow-1.5.1-py2.7.egg/airflow/bin/cli.py", line 103, in run
    raise AirflowException(msg)
airflow.utils.AirflowException: DAG [bam-to-bigwig-19] could not be found



Any help appreciated


Craig Kimerer

unread,
Sep 27, 2015, 1:41:59 AM9/27/15
to Airflow
I think your error might be elsewhere -- this script works for me:

from datetime import datetime

from airflow import DAG
from airflow.operators import PythonOperator

default_args = {
 'owner': 'airflow',
 'start_date': datetime(2015, 9, 25)
}

def f(*args, **kwargs):
  print 'hello!'

for x in range(10,20,1):
  dag_id = 'test-dag-%d' % x
  d = DAG(dag_id, default_args=default_args)

  t = PythonOperator(task_id='t1',
    dag=d,
    python_callable=f)

  globals()[dag_id] = d

Is there anything else of interest in the logs?

Andrey Kartashov

unread,
Sep 27, 2015, 9:39:41 AM9/27/15
to Airflow
Oh, sorry, I always on different frequency :) Yep this exact script is working. Lets, make it run once:
from datetime import datetime
from airflow import DAG
from airflow.operators import PythonOperator
import os

default_args
= {

 
'owner': 'airflow',
 
'start_date': datetime(2015, 9, 25)
}

def f(*args, **kwargs):

 
print('hello!')

if os.path.isfile("/tmp/run_test") and os.access("/tmp/run_test", os.R_OK):
    os
.remove("/tmp/run_test")


   
for x in range(10,20,1):
      dag_id
= 'test-dag-%d' % x
     
d = DAG(dag_id, default_args=default_args)
   
      t
= PythonOperator(task_id='t1',
       
dag=d,
       
python_callable=f)
   
     
globals()[dag_id] = d

Now, when in models.py you process the script all 10 DAGs appear once.

So, why I do this way. My example, I have a list of experiments (the list is growing 10,...,20,...,1000,...inf) each experiment has to be processed. Processing is a fixed sequence of tools(pipeline), each set of tools has to be run sequentially and output of previous step usually input to the next. When experiment is processed it should not be processed any more. 

There was a suggestion to put a job files in a directory and create DAG for each job file in my logic I thought that after I created a DAG I can delete file. So my DAG example simplified that idea, it creates 10 DAGs when file appears then delete the file to stop DAGs generation (after that no more DAGs with the same or new id are created). So behavior really close to what I would like to achieve.

The job file is useful just for one experiment file name/directory/params:
"input": {
    "class": "File",
    "path": "rna.SRR948778.bam"
},
"genomeFile": {
    "class": "File",
    "path": "mm10-chrNameLength.txt"
},
"scale":1,
"dept":"-bg",
"genomecoverageout": "rna.SRR948778.bedGraph"

and all tools(TASKs) have to know this information in one DAG run.

Craig Kimerer

unread,
Sep 27, 2015, 11:30:31 AM9/27/15
to Airflow
Ah, yes.  The reason you're seeing this issue is that the scheduler on first run sees that file and removes it and creates the DAG.  The worker then tries to load the same file doesn't load any DAGs because the file no longer exists, so it doesn't know how to run that task.

I'd use the code above with two small changes - 
1) remove the line that calls os.remove(...)
2) change the end_date of the default_args to be datetime(2015, 9, 25) or whatever your start_date is - this will ensure it only gets run once

This way the DAG stays "defined" when the workers load it. and the scheduler loads it on a future run if it was unable to schedule it the first time, like if all the workers were busy).

Andrey Kartashov

unread,
Sep 27, 2015, 2:40:31 PM9/27/15
to Airflow
How I can keep DAGs synchronized? I mean at one moment I have 3 files in a folder at next moment it can be 10. When i can delete them? Is it possible to see within dag script who has run it worker or not?

Maxime Beauchemin

unread,
Sep 28, 2015, 2:02:35 PM9/28/15
to Airflow
There are many ways to keep file synchronized across machines. At Airbnb we use the git resource in chef. You should have a way to synchronize a code repo in your infrastructure. If not I'd cron a `git pull` script of some sort.

Andrey Kartashov

unread,
Sep 28, 2015, 2:09:16 PM9/28/15
to Airflow
Oh sorry not about that.


Each file (JOB) that I have has information about working directory/ files to analyze. I've put this files in one directory with different names (lets use UUID). I have a pipeline. It moves first file out of the directory and  applies tasks correspondingly. 
Now there was a suggestion do not move files and the script that populate DAGs will produce as many DAGs as files I have - like a solution - ok. If I have 20000 JOBs why I need 20000 DAGs so I would like to delete JOB files. 
Reply all
Reply to author
Forward
0 new messages