I'm looking for a "good" example for GoogleCloudStorageToBigQueryOperator

3,120 views
Skip to first unread message

zu...@google.com

unread,
May 30, 2018, 10:11:37 AM5/30/18
to cloud-composer-discuss
hi,
I need to load a bunch of csv files, from a GCS bucket, into a BQ table. the table doesn't exist, needs to be created and auto-detect the schema.

thanks
Antonio

Feng Lu

unread,
Jun 1, 2018, 3:05:52 AM6/1/18
to Antonio Zurlo, Tim Swast, cloud-composer-discuss

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/7aa53862-6808-4db4-9f86-b2395e7cf1a8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

im...@essential.com

unread,
Aug 3, 2018, 8:15:48 PM8/3/18
to cloud-composer-discuss
+1 doesnt seem to be a way to autodetect schema...

jeremy.w...@everoad.com

unread,
Aug 6, 2018, 4:03:57 AM8/6/18
to cloud-composer-discuss
Had to face almost the same thing, I have a csv to upload and I wanted to use autodetect schema. 

I finally used this. It means that I prefer to use PythonOperator than other ones. 

The generic code looks like this: 
 
f = BytesIO(response.content)
f.flush()
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.time_partioning = 'Day'
job_config.skip_leading_rows = 1
job_config.autodetect = True
job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result()
logging.info('Loaded {} rows into {}:{}.'.format(job.output_rows, dataset_id, table_id))
f.close()


With f as a BytesIO file. Other tasks are getting csv files then when I need to upload I use this generic function.

Note that your schema must be determinist. If you want to append an existing table, 'null' values can be detected as STRING instead of INTEGER / TIMESTAMP and so on. It means that you must control your csv content carefully. 

Don't know if it helps or if you were looking for some workaround but for now that's how I deal with csv files. 

im...@essential.com

unread,
Aug 7, 2018, 9:32:25 PM8/7/18
to cloud-composer-discuss
I don't think there is a way to auto detect schema using the bigquery operator.  Anyways heres how I did everything else:

Here is how I do it:
Note Schema is a nested tuple list: 
FlowSchema = (
('contact_email', 'string'),
('currency', 'string'),...
)

Write response json data to tempfile like so:

def FlowUtilityJSONtoTempCsvFile(response_data_json, schema):
# takes in json structure and returns a temp file written with the csv data based on schema
tmp_file_handle_csv = tempfile.NamedTemporaryFile(
mode='wt', delete=False, suffix='.csv')
output = csv.writer(tmp_file_handle_csv) # create a csv.write
output.writerow([x[0] for x in schema]) # header row
for row in response_data_json:
output.writerow(row)
tmp_file_handle_csv.flush()

Write this file to google cloud storage for archiving:

gcs_hook = GoogleCloudStorageHook()
gcs_hook.upload(
bucket='<bucket>',
object='<csvfile name>.csv',
filename=FlowUtilityJSONtoTempCsvFile(
response_data_json=orders, schema=schema),
mime_type='text/csv')

Finally upload the files that were written to gcs to bigquery using operator
- Note list comprehension to use the same schema as above 
bq_op = GoogleCloudStorageToBigQueryOperator(
task_id='save_to_bigquery_{}'.format(page_index),
# generate bq schema object
schema_fields=[{
'description': f[0],
'name':f[0],
'type':f[1]
} for f in FlowSchema],
bucket='bucket',
source_objects=FlowOperatorArgs['cloud_storage_output_files'],
destination_project_dataset_table='{}.essential_bi.{}'.format(FlowOperatorArgs['project_id'],
FlowOperatorArgs['staging_table']),
schema_update_options=[
'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"],
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
source_format='CSV',
skip_leading_rows=1,
field_delimiter=','
)


On Wednesday, May 30, 2018 at 7:11:37 AM UTC-7, zu...@google.com wrote:

Anthony Brown

unread,
Aug 8, 2018, 3:11:14 AM8/8/18
to cloud-compo...@googlegroups.com
You can also save the schema definition to GCS and use that when loading data - eg

save this to gs://bucket/schema.json

[
    {"type": "INT64", "name": "first_column_name", "mode": "NULLABLE"},
    {"type": "INT64", "name": "second_column_name", "mode": "NULLABLE"}
]

then the load operator is

    LOAD_DATA_TO_BQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
        task_id='load_data',
        bucket='bucket',
        source_objects=['data.csv'],
        destination_project_dataset_table='test.my_table',
        source_format='CSV',
        schema_object='schema.json',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        google_cloud_storage_conn_id='google_cloud_storage_default',
        bigquery_conn_id='bigquery_default',
        field_delimiter=','
        )

If you have a header row, you could have an operator which reads the first line to get the column names

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.


--
-- 

Anthony Brown
Data Engineer BI Team - John Lewis
Tel : 0787 215 7305

**********************************************************************
This email is confidential and may contain copyright material of the John Lewis Partnership.
If you are not the intended recipient, please notify us immediately and delete all copies of this message.
(Please note that it is your responsibility to scan this message for viruses). Email to and from the
John Lewis Partnership is automatically monitored for operational and lawful business reasons.
**********************************************************************

John Lewis plc
Registered in England 233462
Registered office 171 Victoria Street London SW1E 5NN

Websites: https://www.johnlewis.com
http://www.waitrose.com
https://www.johnlewisfinance.com
http://www.johnlewispartnership.co.uk

**********************************************************************

Reply all
Reply to author
Forward
0 new messages