Pinning a group of tasks in a dag to run on the same celery worker

1,034 views
Skip to first unread message

armando a.

unread,
Oct 23, 2015, 2:51:13 PM10/23/15
to Airflow
I currently have a setup where I have one machine running airflow webserver and airflow scheduler.  Then I have a set of three machines running airflow worker all in the same default queue.  Each one of the workers has docker installed and can run docker locally.  The probably I have is that building a docker image works but that image is only accessible to the worker that made it.  So when another worker tries to run the docker image, it cant find it because its on another machine.  Is there a way to say, for all tasks in this dag, run on the same celery worker?


The dag
dag = airflow.DAG('local_docker', default_args=default_args, schedule_interval=dt.timedelta(minutes=15))

def build_docker_image(**kwargs):
    docker = dock.DockerObject()
    image = docker.build_image(docker_dir=os.path.dirname(os.path.abspath(__file__)))
    logger.info("build image = %s", image)

    return image

def run_docker_image(**kwargs):
    ti = kwargs['ti']
    image = ti.xcom_pull(task_ids="build_docker_image")
    logger.info("run image = %s", image)

    docker = dock.DockerObject()
    docker.run_image(image)


t1 = airflow.operators.PythonOperator(
    task_id='build_docker_image',
    dag=dag,
    provide_context=True,
    python_callable=build_docker_image)

t2 = airflow.operators.PythonOperator(
    task_id='run_docker_image',
    dag=dag,
    provide_context=True,
    python_callable=run_docker_image)

t2.set_upstream(t1)

armando

Maxime Beauchemin

unread,
Oct 26, 2015, 1:07:03 PM10/26/15
to Airflow
You could do that by specifying a queue (specify a queue for each DAG or task, and specify a queue when starting each worker), but that isn't ideal as you loose the advantages of running a distributed system.
Reply all
Reply to author
Forward
0 new messages