from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from mapping import BWA_MEM_COMMAND
from airflow.operators import FastqSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['air...@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
THE_HUMAN_GENOME = "/Users/mlyons/genomics/reference/human_g1k_v37.fasta"
BAM_DIR = "/Users/mlyons/genomics/1kg/bam"
BIN_DIR = "/Users/mlyons/genomics/bin"
simple_mapping_pipeline = DAG(dag_id="simple_mapping_pipeline", default_args=default_args, schedule_interval=timedelta(minutes=2))
# FastqSensor just does a glob of the directory
fastq_sensor = FastqSensor(directory="/Users/mlyons/genomics/1kg/unprocessed_fastq",
dag=simple_mapping_pipeline,
task_id='fastq_sensor',
poke_interval=60)
bwa_mem = BashOperator(bash_command=BWA_MEM_COMMAND,
params={'path_to_reference_file': THE_HUMAN_GENOME,
'path_to_output': BAM_DIR,
'bin': BIN_DIR},
dag=simple_mapping_pipeline,
task_id='bwa_mem')
bwa_mem.set_upstream(fastq_sensor)
# The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation parallelism = 32 # The number of task instances allowed to run concurrently by the scheduler dag_concurrency = 16 # The maximum number of active DAG runs per DAG max_active_runs_per_dag = 16