
From the blog you linked, the Validate & Mutate BQ schema step can be done in a multitude of ways. As they don’t mention the exact approach they took, you can try to recreate a process which completes a similar task using a variety of Apache Beam SDK classes and functions. A great avenue to explore is the DynamicDestinations class. This will allow you to write to different BigQuery tables depending on the input element.
Moreover, this Stackoverflow thread gives a great solution to address this use case. This can be done by running two pipelines concurrently. One pipeline will write the schema to a durable location and the other pipeline would use that information to write the data to a BigQuery table of the appropriate form.
.to(new DynamicDestinations<UserEvent, String>() {
public String getDestination(ValueInSingleWindow<String> element) {
return "destination";
}
public TableDestination getTable(String user) {
return new TableDestination("project:dataset.tableName", "BQ table");
}
public TableSchema getSchema(String user) {
//Not able to generate the Schema as it depends on UserEvent!
}
})Hey Harshit,
Could you please clarify what do you mean by it won’t be possible if you have a streaming pipeline reading and writing from GCS? If you are referring to blocking a pipeline to wait for another pipeline to be completed, you can use the waitUntilFinish() function to block the streaming pipeline’s execution awaiting the completion of the second one.
Next, to ensure that you are not inserting duplicates, you can use an insertID for each row. As stated in the documentation, BigQuery records those IDs for a period of one minute. If the data instance come through multiple times within that time frame, they will be treated as duplicate. Additionally, if you want to take an extra step to make sure there are no duplicates, you can manually remove them by following the steps outlined in this documentation.
Regarding not being able to use DynamicDestinations, could you clarify your use case? In my original message, I recommended DynamicDestinations as I understood that the schema/table changes depending on the user event.
Lastly, this Stackoverflow answer proposes an alternative to create a schema based on the distributed data. Although there is no guarantee of the proper functionality of their solution, it seems to have been well received by the community.