So I'm trying to test a DAG piece by piece so the first thing I put in was just to test that my custom operator in a plugin I've imported is working, but I'm getting the dumbest error message I've ever seen, and I don't even know how to step through and debug this. The error message when I run the test command below is:
import csv
import tempfile
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from facebook_ads_plugin.hooks.facebook_ads_hook import FacebookAdsHook
class FacebookAdsToCloudStorageOperator(BaseOperator):
"""
Transfers Facebook Ads data to a Google Cloud Storage bucket.
:param level: A string representing the level for which to get Facebook
Ads data, can be campaign or ad_set level.
:type level: string
:param fields: The fields your report should query.
:type fields: list
:param object_name: The object name to set when uploading. (templated)
:type object_name: string
:param facebook_ads_conn_id: Reference to a specific Facebook Ads hook.
:type facebook_ads_conn_id: string
:param google_cloud_storage_conn_id: Reference to a specific Google Cloud
Storage hook.
:type google_cloud_storage_conn_id: string
"""
template_fields = ('object_name')
@apply_defaults
def __init__(self,
level,
fields,
object_name,
facebook_ads_conn_id='facebook_ads_default',
google_cloud_storage_conn_id='google_cloud_storage_default',
*args,
**kwargs):
super(FacebookAdsToCloudStorageOperator,
self).__init__(*args, **kwargs)
self.level = level
self.fields = fields
self.object_name = object_name
self.facebook_ads_conn_id = facebook_ads_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
def execute(self, context):
facebook_ads_conn = FacebookAdsHook(
facebook_ads_conn_id=self.facebook_ads_conn_id)
gcs_conn = GoogleCloudStorageHook()
if self.level is 'campaigns':
cursor = facebook_ads_conn.get_campaigns(fields=self.fields)
if self.level is 'ad_sets':
cursor = facebook_ads_conn.get_ad_sets(fields=self.fields)
with tempfile.NamedTemporaryFile() as file:
csvwriter = csv.writer(file)
csvwriter.writerow(self.fields)
for item in cursor:
csvwriter.writerow(item.get(field, '')
for field in self.fields)
gcs_conn.upload(
bucket='/home/airflow/gcs/data/test',
object=self.object_name,
filename=file.name,
mime_type='text/csv'
)
'''DAG definition for Facebook Ads ETL.'''
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.facebook_ads import FacebookAdsToCloudStorageOperator # noqa: E501; pylint: disable=E0401
default_args = {
'depends_on_past': False,
'start_date': datetime.datetime(2018, 12, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': datetime.timedelta(minutes=5)
}
with DAG('facebook_ads_etl',
default_args=default_args,
catchup=False) as dag:
dag.doc_md = __doc__
t1 = FacebookAdsToCloudStorageOperator(
task_id='facebook_ads_to_cloud_storage_campaigns',
level='campaigns',
fields=['id', 'account_id', 'daily_budget', 'status', 'name'],
object_name='facebook_ads_campaigns'
)
t2 = DummyOperator(task_id='dummy')
t1 >> t2