"""
- Experiment with executing remote (Python 3) Celery tasks -
"""
from airflow import DAG
from airflow.contrib.hooks.redis_hook import RedisHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from celery import Celery
import logging
########################################################################
# runs a celery task on a named queue in Redis
def runMyTask(queueName=None, taskName=None, taskArgsDict={}):
redis = RedisHook(redis_conn_id='test_environment_redis')
redisURL = 'redis://h:{pw}@{host}:{port}/{db}'.format(pw=redis.password,
host=redis.host,
port=redis.port,
db=0)
workers = Celery('test_namepsace')
workers.conf.update(
timezone = 'UTC',
broker_connection_timeout = 5,
broker_pool_limit = None,
broker_url = redisURL,
result_backend = redisURL,
redis_db = 0,
redis_host = redis.host,
redis_password = redis.password,
redis_port = redis.port,
task_routes = {taskName: {'queue': queueName}},
task_soft_time_limit = 240
)
try:
work = workers.send_task(taskName, kwargs=taskArgsDict)
except Exception as e:
logging.error("send_task failed: %s", e)
raise Exception(e)
try:
result = work.get(timeout=240, interval=1, disable_sync_subtasks=False)
except Exception as e:
logging.error("work.get failed: %s", e)
raise Exception(e)
if result != None:
# Works in Python 3
# but we are in Python 2
# assume result is a string for this experiment
return result
########################################################################
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 9, 19, 00, 00, 00),
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'max_active_runs': 1
}
dag = DAG('celery_task_experiment', default_args=default_args, schedule_interval=None)
tryTask = PythonOperator(
task_id='my_remote_task',
python_callable=runMyTask,
op_kwargs={
'queueName': 'some_q',
'taskName': 'tasks.mytask',
#'taskName': 'tasks.test_nonexistent_task_handling',
'taskArgsDict': {'some_param': 'some_value'}
},
dag=dag
)