load data from BQ and use it as task manager

214 views
Skip to first unread message

sgio...@bitbang.com

unread,
May 3, 2019, 9:06:15 AM5/3/19
to cloud-composer-discuss
Hi to all,
I would like to use data that i have just got on BQ for driving the load of data from Mailchimp to GCS.
I've got a lot of issues to do this.

I did the export of the table data from BQ to GCS using BigQueryToCloudStorageOperator (from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator)
After this i found some problems:
  • If I save the data on a bucket different from the bucket of airflow I can't load the file, indeed the load from a python function (called by pythonOperator) give the error:
FileNotFoundError: [Errno 2] No such file or directory: 'gs://facile-tmp-data/mailchimp-mailchimp_campagne.json'
 also the file exists (maybe because it don't exists in all instances of Airflow)

  • If I save the data on /home/airflow/gcs/dags (but to write on, You must use the gs:// associated path) i can read it even without an operator, but at least a fake file with the same name of the file that You will write using BigQueryToCloudStorageOperator MUST exists. Now i can read the file with a simple python open command (using this time /home/airflow/gcs/dags) and use the data to drive the MailchimpToGCSOperator.
The question is: is my second choice the unique path that I can follow?

npat...@gmail.com

unread,
May 3, 2019, 9:26:29 AM5/3/19
to cloud-composer-discuss
Hi,
Regarding issue 1, I wonder if the service account used by Composer has access to the bucket `facile-tmp-data` ?
Is the bucket in the same project as composer?
Are u using the default composer service account ?

sgio...@bitbang.com

unread,
May 3, 2019, 10:05:46 AM5/3/19
to cloud-composer-discuss
Hi, npat,
Yes the 2 buckets are in the same project and same region:
  • europe-west1-facile-compose-XXXXXXX-bucket  Multi-Regional  Unione Europea Per oggetto Nessuno     ACL e criteri dei bucket
  • facile-tmp-data                             Multi-Regional  Unione Europea Per oggetto Nessuno     Solo criterio bucket
The issue is that i can't direct open a file on GCS out from the mapped /home/airflow/gcs/dag
So I can't use:
with codecs.open(path.join(TMP_DATA_LOC, driver_file), 'r', encoding='utf-8', errors='ignore') as f:
ldata = f.readlines()
where TMP_DATA_LOC is: 'gs://facile-tmp-data'

So, I have to deduce that i can't load a file from GCS in DAG memory stack if the file is not in /home/airflow/gcs/dag

npat...@gmail.com

unread,
May 3, 2019, 10:39:16 AM5/3/19
to cloud-composer-discuss
Hi,
If you want to read a file from gcs directly 
1) use google cloud storage library to download
bucket_name = 'facile-tmp-data'
file = 'mailchimp-mailchimp_campagne.json'


import google.cloud.storage as storage
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(file)

#content = blob.download_as_string()
blob.download_to_filename(
'/tmp/campagin.json')
....


2) use built in gcs io in tensorflow.
Tensorflow is already installed in composer

from tensorflow.python.lib.io import file_io
import pandas as pd
def fetch_as_dataframe(path) -> pd.DataFrame:
with file_io.FileIO(path, 'r') as f:
if ".json" in path:
df = pd.read_json(f, lines=True)
else:
df = pd.read_csv(f)
return df

fetch_as_dataframe('gs://facile-tmp-data/mailchimp-mailchimp_campagne.json')

sgio...@bitbang.com

unread,
May 6, 2019, 4:42:26 AM5/6/19
to cloud-composer-discuss
Hi Nidhin,
if I use a  library that load from GS:// i must put the method inside an operator, otherwise the UI of Airflow, set the dag as "offline" because code of  the dag can read only from path mapped on airflow.
So the question is How can I use a python function to generate a list of task base on data read from file that are on a bucket.
Thanks
Reply all
Reply to author
Forward
0 new messages