Autodetect Bigquery schema in Dataflow

1,704 views
Skip to first unread message

Harshit Dwivedi

unread,
Oct 9, 2018, 10:34:24 AM10/9/18
to Google Cloud Developers
Hi Folks,
I am using Cloud DataFlow with Apache Beam SDK to write the events from PubSub to BigQuery.

But the problem here is that my data is dynamic in nature, as in the JSON schema can change for different events.
I also don't know the schema while creating the DataFlow Pipeline.

So I wanted to ask if there is a way, by which I can generate the Schema for my BigQuery table from the event in my DataFlow pipeline?

I am using the Java SDK, 2.5.0 and couldn't find any resources to do this.

The google cloud blog has an article on a very similar topic but there's no explanation on how the mutation was done (see the attached image)

Screenshot 2018-10-09 at 7.48.13 PM.png


Ali T (Cloud Platform Support)

unread,
Oct 9, 2018, 11:46:13 PM10/9/18
to Google Cloud Developers

From the blog you linked, the Validate & Mutate BQ schema step can be done in a multitude of ways. As they don’t mention the exact approach they took, you can try to recreate a process which completes a similar task using a variety of Apache Beam SDK classes and functions. A great avenue to explore is the DynamicDestinations class. This will allow you to write to different BigQuery tables depending on the input element.


Moreover, this Stackoverflow thread gives a great solution to address this use case. This can be done by running two pipelines concurrently. One pipeline will write the schema to a durable location and the other pipeline would use that information to write the data to a BigQuery table of the appropriate form.

Message has been deleted

Harshit Dwivedi

unread,
Oct 11, 2018, 8:53:22 AM10/11/18
to Google Cloud Developers
Hi Ali, the stackoverflow answer you linked makes sense. I'll look more into it.
I had a followup question though, what if I have a Streaming pipeline, writing to GCS and reading from it might not work in that scenario, what do you think?

I will also need to check and see that I'm not loading duplicate data from GCS while streaming from GCS to BQ.

Also, I might not be able to use DynamicDestination since I am not able to generate the Schema from my data, here's what my code looks like : 

.to(new DynamicDestinations<UserEvent, String>() {
        public String getDestination(ValueInSingleWindow<String> element) {
          return "destination";
        }
        public TableDestination getTable(String user) {
          return new TableDestination("project:dataset.tableName", "BQ table");
} public TableSchema getSchema(String user) { //Not able to generate the Schema as it depends on UserEvent! } })

Any ideas on what to do here? I can generate the schema if I get UserEvent inside the getSchema() method,

Ali T (Cloud Platform Support)

unread,
Oct 11, 2018, 11:24:44 PM10/11/18
to Google Cloud Developers

Hey Harshit,


Could you please clarify what do you mean by it won’t be possible if you have a streaming pipeline reading and writing from GCS? If you are referring to blocking a pipeline to wait for another pipeline to be completed, you can use the waitUntilFinish() function to block the streaming pipeline’s execution awaiting the completion of the second one.


Next, to ensure that you are not inserting duplicates, you can use an insertID for each row. As stated in the documentation, BigQuery records those IDs for a period of one minute. If the data instance come through multiple times within that time frame, they will be treated as duplicate. Additionally, if you want to take an extra step to make sure there are no duplicates, you can manually remove them by following the steps outlined in this documentation.


Regarding not being able to use DynamicDestinations, could you clarify your use case? In my original message, I recommended DynamicDestinations as I understood that the schema/table changes depending on the user event.


Lastly, this Stackoverflow answer proposes an alternative to create a schema based on the distributed data. Although there is no guarantee of the proper functionality of their solution, it seems to have been well received by the community.


Dominic Parry

unread,
Jul 9, 2019, 10:13:42 AM7/9/19
to Google Cloud Developers
Hi there,

For anyone else who finds this, here is an example repo of where this is implemented: 

dataflow-dynamic-schema


It's not mine, but I found it looking for the same thing...
Reply all
Reply to author
Forward
0 new messages