scheduling DAG to run more than once during the day

1,590 views
Skip to first unread message

Mike Lyons

unread,
Sep 11, 2015, 5:37:00 PM9/11/15
to Airflow
I have a simple pipeline with a custom sensor to look for a file and pass that file name to a bash operator that does some processing. The file sensor runs instantly, and the bash operator takes a few hours. Ideally, I'dl like the file sensor to keep running (i thought the poke_interval was for this, maybe not) and to keep starting/queue'ing the bash operator. However, and this is hopefully my naivety with the software showing, the file sensor wont run again until the bash operator completes. This results in a mostly serial pipeline.

Any ideas or best practice advice?

Andrey Kartashov

unread,
Sep 13, 2015, 10:09:53 AM9/13/15
to Airflow
Hi,
I have almost the same question. I have a script to run and parameter is a job file. Before airflow it was just cron job - check a directory for a job file then run script. Now I would like to make it more complicated, but start is the same - check a directory if file there copy/remove file and start the workflow.
Thanks,
Andrey
Message has been deleted

Mike Lyons

unread,
Sep 13, 2015, 2:32:24 PM9/13/15
to Airflow
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

from mapping import BWA_MEM_COMMAND
from airflow.operators import FastqSensor

default_args
= {
 
'owner': 'airflow',
 
'depends_on_past': False,
 
'start_date': datetime(2015, 6, 1),
 
'email': ['air...@airflow.com'],
 
'email_on_failure': False,
 
'email_on_retry': False,
 
'retries': 1,
 
'retry_delay': timedelta(minutes=5),
 
# 'queue': 'bash_queue',
 
# 'pool': 'backfill',
 
# 'priority_weight': 10,
 
# 'end_date': datetime(2016, 1, 1),
}
THE_HUMAN_GENOME
= "/Users/mlyons/genomics/reference/human_g1k_v37.fasta"
BAM_DIR
= "/Users/mlyons/genomics/1kg/bam"
BIN_DIR
= "/Users/mlyons/genomics/bin"

simple_mapping_pipeline
= DAG(dag_id="simple_mapping_pipeline", default_args=default_args, schedule_interval=timedelta(minutes=2))

# FastqSensor just does a glob of the directory
fastq_sensor
= FastqSensor(directory="/Users/mlyons/genomics/1kg/unprocessed_fastq",
 dag
=simple_mapping_pipeline,
 task_id
='fastq_sensor',
 poke_interval
=60)

bwa_mem
= BashOperator(bash_command=BWA_MEM_COMMAND,
 
params={'path_to_reference_file': THE_HUMAN_GENOME,
 
'path_to_output': BAM_DIR,
 
'bin': BIN_DIR},
 dag
=simple_mapping_pipeline,
 task_id
='bwa_mem')

bwa_mem
.set_upstream(fastq_sensor)




Here's the pipeline just in case the question isn't clear

Mike Lyons

unread,
Sep 13, 2015, 3:55:56 PM9/13/15
to Airflow
So, Andrey pointed out that I was probably using the SequentialExecutor, changed to LocalExecutor, and now the FastqSensor job re-runs as expected, but no other BashOperator jobs start. Is that because there isn't another 'worker' available? Would that be solved using Celery and adding workers?

Tim Hirzel

unread,
Jan 21, 2016, 5:43:24 PM1/21/16
to Airflow
Hi Mike,

This is an old thread now, but I was searching around with similar question and found your thread and also a recent thread that proposes a good solution to your problem.

Also, in regards to your BashOperators not starting, depending on relative timings of your tasks, it might be related to these 3 items in your airflow.cfg:


# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously # on this airflow installation parallelism = 32 # The number of task instances allowed to run concurrently by the scheduler dag_concurrency = 16 # The maximum number of active DAG runs per DAG max_active_runs_per_dag = 16

Tim

Reply all
Reply to author
Forward
0 new messages