DAG testing error

389 views
Skip to first unread message

z...@useracquisition.com

unread,
Dec 19, 2018, 6:02:45 PM12/19/18
to cloud-composer-discuss
So I'm trying to test a DAG piece by piece so the first thing I put in was just to test that my custom operator in a plugin I've imported is working, but I'm getting the dumbest error message I've ever seen, and I don't even know how to step through and debug this. The error message when I run the test command below is: 


AttributeError: 'FacebookAdsToCloudStorageOperator' object has no attribute 'o'


with a test command of:


gcloud composer environments run ENVIRONMENT_NAME --location LOCATION list_dags -- -sd /home/airflow/gcs/data/test


and finally the operator and DAG look like the following: What the hell is going on here!


import csv
import tempfile

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from facebook_ads_plugin.hooks.facebook_ads_hook import FacebookAdsHook


class FacebookAdsToCloudStorageOperator(BaseOperator):
"""
Transfers Facebook Ads data to a Google Cloud Storage bucket.

:param level: A string representing the level for which to get Facebook
Ads data, can be campaign or ad_set level.
:type level: string
:param fields: The fields your report should query.
:type fields: list
:param object_name: The object name to set when uploading. (templated)
:type object_name: string
:param facebook_ads_conn_id: Reference to a specific Facebook Ads hook.
:type facebook_ads_conn_id: string
:param google_cloud_storage_conn_id: Reference to a specific Google Cloud
Storage hook.
:type google_cloud_storage_conn_id: string
"""

template_fields = ('object_name')

@apply_defaults
def __init__(self,
level,
fields,
object_name,
facebook_ads_conn_id='facebook_ads_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
*args,
**kwargs):
super(FacebookAdsToCloudStorageOperator,
self).__init__(*args, **kwargs)
self.level = level
self.fields = fields
self.object_name = object_name
self.facebook_ads_conn_id = facebook_ads_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id

def execute(self, context):
facebook_ads_conn = FacebookAdsHook(
facebook_ads_conn_id=self.facebook_ads_conn_id)
gcs_conn = GoogleCloudStorageHook()

if self.level is 'campaigns':
cursor = facebook_ads_conn.get_campaigns(fields=self.fields)

if self.level is 'ad_sets':
cursor = facebook_ads_conn.get_ad_sets(fields=self.fields)

with tempfile.NamedTemporaryFile() as file:
csvwriter = csv.writer(file)
csvwriter.writerow(self.fields)

for item in cursor:
csvwriter.writerow(item.get(field, '')
for field in self.fields)

gcs_conn.upload(
bucket='/home/airflow/gcs/data/test',
object=self.object_name,
filename=file.name,
mime_type='text/csv'
)



'''DAG definition for Facebook Ads ETL.'''

import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.facebook_ads import FacebookAdsToCloudStorageOperator # noqa: E501; pylint: disable=E0401

default_args = {
'depends_on_past': False,
'start_date': datetime.datetime(2018, 12, 1),
'email': ['air...@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': datetime.timedelta(minutes=5)
}

with DAG('facebook_ads_etl',
default_args=default_args,
catchup=False) as dag:

dag.doc_md = __doc__

t1 = FacebookAdsToCloudStorageOperator(
task_id='facebook_ads_to_cloud_storage_campaigns',
level='campaigns',
fields=['id', 'account_id', 'daily_budget', 'status', 'name'],
object_name='facebook_ads_campaigns'
)

t2 = DummyOperator(task_id='dummy')

t1 >> t2

z...@useracquisition.com

unread,
Dec 19, 2018, 6:09:29 PM12/19/18
to cloud-composer-discuss
Okay so if I comment out the template_fields declaration in the Operator it works. What was I doing wrong and why was the error message so bad?

Wilson Lian

unread,
Dec 19, 2018, 6:52:02 PM12/19/18
to z...@useracquisition.com, cloud-composer-discuss
The issue is that the value of template_fields that you passed wasn't a tuple. Singleton tuples need a comma: template_fields = ('object_name',)
Airflow iterates over template_fields, which is why it was looking for 'o', the first element in the iterable string 'object_name'.

best,
Wilson

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/b423ed26-6a20-46a7-bf24-bc2d74b82bd4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Zac Friedman

unread,
Dec 21, 2018, 11:13:05 AM12/21/18
to Wilson Lian, cloud-composer-discuss
Ok, thanks for the clarification.
Reply all
Reply to author
Forward
0 new messages