using celery to connect somewhere else

224 views
Skip to first unread message

Rick Otten

unread,
Sep 14, 2018, 5:28:42 PM9/14/18
to cloud-composer-discuss
I have a celery worker running somewhere else that is completely independent of the Composer instance.  All of its tasks are Python 3.  Since Composer is Python 2, it isn't easy to import the Task.

What I'd like to do is configure a Redis connector, maybe establish a Redis hook, and then use that to request a task to be run by the worker which is somewhere else without importing the original Python 3 full task definition into Airflow.

Is that possible?  I've always just executed Celery tasks by importing the whole task definition.

My fallback plan, much less elegant, is to ssh to another server where python 3 is installed and to call a standalone mini-python 3 app that invokes the task.

Another option would be to update the Django app where the task is currently implemented to have an endpoint that just triggers it, and then use an http operator to hit that api endpoint.  This is also less elegant than a direct task request, but maybe faster than building a stand-alone mini-app, but maybe hard to secure properly.

There is no pre-written Redis Operator in the community that I've been able to find.  In Airflow I've never tried to use a Hook or a Connector without having an Operator to wrap it.  I'm not sure I want to invest a lot of time building a new Operator (nor whether I really need it).  Rumor has it within a few months we'll be able to use Python 3 in Composer, at which time I could just use a python callable that imports the task.  I'm hoping for a simpler approach that just triggers a remote celery task on the remote worker without a lot of overhead or fuss.

thoughts?


Trevor Edwards

unread,
Sep 17, 2018, 1:55:08 PM9/17/18
to cloud-composer-discuss
Python 3 Composer is in the works, and will hopefully be out within about a month.

I'm not too familiar with the details of your use case, but you may also want to consider using the KubernetesPodOperator (https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator). Is the task you're describing an Airflow task?

Rick Otten

unread,
Sep 18, 2018, 4:35:51 PM9/18/18
to cloud-composer-discuss


On Monday, September 17, 2018 at 1:55:08 PM UTC-4, Trevor Edwards wrote:
Python 3 Composer is in the works, and will hopefully be out within about a month.

I'm not too familiar with the details of your use case, but you may also want to consider using the KubernetesPodOperator (https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator). Is the task you're describing an Airflow task?

On Friday, September 14, 2018 at 2:28:42 PM UTC-7, Rick Otten wrote:
I have a celery worker running somewhere else that is completely independent of the Composer instance.  All of its tasks are Python 3.  Since Composer is Python 2, it isn't easy to import the Task.


The celery workers I'm interested in running tasks on are listening on a queue in a Redis instance in AWS.  They have many other pieces of our infrastructure also running tasks there. (I'll hopefully slowly move those pieces, and the queue, into Google Cloud but it will take a long time.)  Meanwhile I found this nifty blog entry that tells me what I need to know to be able to run tasks from Python 2 in Composer as Python 3 on a remote worker:  https://www.distributedpython.com/2018/06/19/call-celery-task-outside-codebase/

It will take me a few days before I'll have something working that I can share.  Ideally we will be able to use a Redis connector so we don't have to have the keys in the DAG.  I'm not at all sure how that will work yet, so there is still some research and testing to do...


Rick Otten

unread,
Sep 19, 2018, 5:39:19 PM9/19/18
to cloud-composer-discuss
To follow up, I have it working now.  Thanks for your patience.  Here is what it looks like:

"""
 - Experiment with executing remote (Python 3) Celery tasks -
"""
from airflow import DAG
from airflow.contrib.hooks.redis_hook import RedisHook
from airflow.operators.python_operator import PythonOperator


from datetime import datetime
from celery import Celery
import logging


########################################################################
# runs a celery task on a named queue in Redis
def runMyTask(queueName=None, taskName=None, taskArgsDict={}):

    redis = RedisHook(redis_conn_id='test_environment_redis')
    redisURL = 'redis://h:{pw}@{host}:{port}/{db}'.format(pw=redis.password, 
                                                          host=redis.host, 
                                                          port=redis.port,
                                                          db=0)

    workers = Celery('test_namepsace')
    
    workers.conf.update(
                        timezone = 'UTC',
                        broker_connection_timeout = 5,
                        broker_pool_limit = None,
                        broker_url = redisURL,
                        result_backend = redisURL,
                        redis_db = 0,
                        redis_host = redis.host,
                        redis_password = redis.password,
                        redis_port = redis.port,
                        task_routes = {taskName: {'queue': queueName}},
                        task_soft_time_limit = 240
                       )



    try:
        work = workers.send_task(taskName, kwargs=taskArgsDict)
    except Exception as e:
        logging.error("send_task failed:  %s", e)
        raise Exception(e)
    try:
        result = work.get(timeout=240, interval=1, disable_sync_subtasks=False)
    except Exception as e:
        logging.error("work.get failed:  %s", e)
        raise Exception(e)

    if result != None:
        # Works in Python 3
        #logging.info("result:  %s", str(result))
        # but we are in Python 2
        # assume result is a string for this experiment
        logging.info("result:  %s", result.encode('utf-8'))

    return result
########################################################################

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 9, 19, 00, 00, 00),
    'email': ['my_email@my_domain.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('celery_task_experiment', default_args=default_args, schedule_interval=None)

tryTask = PythonOperator(
                         task_id='my_remote_task',
                         python_callable=runMyTask,
                         op_kwargs={
                                    'queueName': 'some_q',
                                    'taskName':  'tasks.mytask',
                                    #'taskName':  'tasks.test_nonexistent_task_handling',
                                    'taskArgsDict': {'some_param': 'some_value'}
                                   },
                         dag=dag
                        )


Reply all
Reply to author
Forward
0 new messages