Custom operator not found, despite adding it to a plugin in the plugins folder

3,790 views
Skip to first unread message

Conrad Lee

unread,
Jun 19, 2018, 9:36:10 AM6/19/18
to cloud-composer-discuss
I'm having a hard time getting a custom operator to work.  The main issue is that the cloud composer plugins documentation doesn't include a working example.

I've attached the file in which I define my custom operator (which is a sensor) and add it to a plugin.  I then upload this to the plugins folder in my google cloud storage bucket using the command 

gcloud beta composer environments storage plugins import --environment dw --location us-central1 --source=custom_operators.py

I import the operator in my dag using

from airflow.operators import BigQuerySensor

However, in the Airflow webserver UI I see that my DAG doesn't load, and I see an error which says:

Broken DAG: [/home/airflow/gcs/dags/ref_dash_dag.py] cannot import name BigQuerySensor 

Any idea what I'm doing wrong?  Is there something special I need to do to get airflow to pick up on the plugin? 
custom_operators.py

Vikram Oberoi

unread,
Jun 19, 2018, 12:56:05 PM6/19/18
to con...@parsely.com, cloud-compo...@googlegroups.com
If your goal is import a custom operator into your own DAG, here's another way for you to do it (and how I discovered to do this yesterday): https://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY.

A general question for folks working on Airflow/Composer: if I want to import custom operators/sensors/hooks into my DAGs, why would I make a plugin for it vs. package the code alongside my DAGs?

Vikram

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/aeab287a-e469-4dc2-9fc9-34375a53b168%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Conrad Lee

unread,
Jun 22, 2018, 3:22:47 AM6/22/18
to Vikram Oberoi, cloud-compo...@googlegroups.com
I've found a solution that works using the plugins folder and command.  I defined a custom airflow operator MyCustomOperator in a normal python file called my_custom_operator.py, but did not create a AirflowPlugin class or anything like that.  I then uploaded this file to the plugins folder with the command

gcloud beta composer environments storage plugins import --environment dw --location us-central1 --source=custom_operators.py

In my dag I can now import the operator as follows: from my_custom_operator import MyCustomOperator.

On Tue, Jun 19, 2018 at 6:55 PM, Vikram Oberoi <vik...@psl.com> wrote:
If your goal is import a custom operator into your own DAG, here's another way for you to do it (and how I discovered to do this yesterday): https://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY.

A general question for folks working on Airflow/Composer: if I want to import custom operators/sensors/hooks into my DAGs, why would I make a plugin for it vs. package the code alongside my DAGs?

Vikram
On Tue, Jun 19, 2018 at 6:36 AM Conrad Lee <con...@parsely.com> wrote:
I'm having a hard time getting a custom operator to work.  The main issue is that the cloud composer plugins documentation doesn't include a working example.

I've attached the file in which I define my custom operator (which is a sensor) and add it to a plugin.  I then upload this to the plugins folder in my google cloud storage bucket using the command 

gcloud beta composer environments storage plugins import --environment dw --location us-central1 --source=custom_operators.py

I import the operator in my dag using

from airflow.operators import BigQuerySensor

However, in the Airflow webserver UI I see that my DAG doesn't load, and I see an error which says:

Broken DAG: [/home/airflow/gcs/dags/ref_dash_dag.py] cannot import name BigQuerySensor 

Any idea what I'm doing wrong?  Is there something special I need to do to get airflow to pick up on the plugin? 

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-composer-discuss@googlegroups.com.

crisb

unread,
Jun 22, 2018, 8:58:47 AM6/22/18
to cloud-composer-discuss
you should do

from airflow.operators.parsely_plugin import BigQuerySensor

and it should work.

you may also have to define the plugin like:

class ParselyPlugin(AirflowPlugin):
    name = "parsely_plugin"
    operators = [BigQuerySensor]
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

crisb

unread,
Jun 22, 2018, 9:01:49 AM6/22/18
to cloud-composer-discuss
also note you'll still get:


Broken DAG: [/home/airflow/gcs/dags/ref_dash_dag.py] cannot import name BigQuerySensor

if there is a syntax error in your plugin or not all the imports are there so it may be that it is actually finding the plugin but the import fails.
Reply all
Reply to author
Forward
0 new messages