I wanted to ask if getting random duplicates in downstream tables should be expected or is it something that could be fixed in the future?
I have the same test notebook that I run on same runtime, but in different databricks accounts (in azure) and one every time gives me duplicates while the other never gives duplicates downstream. Any ideas what does that depend on?
Bigger picture:
We have a GDPR scenario when we need to delete data for specific users in delta tables.
We are using ADLS based delta tables, we also replicated that scenario with simple data with dbfs on one databricks account (notebooks attached).
Below details of that scenario:
- There is a source delta table partitioned by day
|-----------------------------------------------------------|
|date | userId | uniqueActivityId | data |
|-----------------------------------------------------------|
- There is a downstream destination delta table partitioned by day
|-------------------------------------------------------------------------------|
|date | userId | uniqueActivityId | transformedData |
|-------------------------------------------------------------------------------|
- There is a streaming job running few times a day getting all new entries from source table, perform transformations and saves in the downstream destination table
- There is also a once a day purge job running (at night when no other job is running on that data) which deletes data for users that wants to be forgotten, data gets cleaned for those users both in the source and destination table.
As we delete (we use merge-delete) data not by partition, we started using ignoreChanges=true flag in our streaming process.
We have observed however that we get a lot of duplicate entries of all others not forgotten users.
We wanted to replicate this and we see that it replicates when we use adls as storage or dbfs on one databricks account, while on the second one it never results in duplicate entries.
Details of test scenario and observations:
Databrics1 - DBR 9.0
- Adding 29 entries to the source table
- Stream data to the destination table -> 29 entries in the destination table
- Delete 4 items from the source table
- Stream data to the destination table -> 29 entries in the destination table
- Metrics of the source table(version operation operationMetrics columns):
version operation operationMetrics
1 MERGE {"numTargetRowsCopied":"0","numTargetRowsDeleted":"4","numTargetFilesAdded":"0","executionTimeMs":"1509","numTargetRowsInserted":"0","scanTimeMs":"949","numTargetRowsUpdated":"0","numOutputRows":"0","numTargetChangeFilesAdded":"0","numSourceRows":"4","numTargetFilesRemoved":"4","rewriteTimeMs":"544"}
0 WRITE {"numFiles":"29","numOutputBytes":"26245","numOutputRows":"29"}
Databrics2 - DBR 9.0
- Adding 29 entries to the source table
- Stream data to the destination table -> 29 entries in the destination table
- Delete 4 items from the source table
- Stream data to the destination table -> 35 entries in the destination table (duplicate entries)
- Metrics of the source table (version operation operationMetrics columns):
1 MERGE {"numTargetRowsCopied":"6","numTargetRowsDeleted":"4","numTargetFilesAdded":"3","executionTimeMs":"1155","numTargetRowsInserted":"0","scanTimeMs":"567","numTargetRowsUpdated":"0","numOutputRows":"6","numTargetChangeFilesAdded":"0","numSourceRows":"4","numTargetFilesRemoved":"3","rewriteTimeMs":"570"}
0 WRITE {"numFiles":"8","numOutputBytes":"7617","numOutputRows":"29"}
The pattern in our prod env and our example in Databrics2 seems same, we get numTargetRowsCopied = x in metrics of source table and x of duplicated entries in the destination table.
We have seen a note in delta documentation as downstream may result in duplicates when using ignoreChanges=true, but it's not very specific, and the fact that in one environment we always get in and in the other we never get it is a little strange.
Has anybody had such observations also?
If that situation is unavoidable however, what is the best way to handle unwanted duplicates in partitioned stream?
Attached my test scala notebook using dbfs