Streaming with ignoreChanges sometimes results in duplicates in delta tables

938 views
Skip to first unread message

Marta Chojnacka

unread,
Sep 16, 2021, 5:16:35 AM9/16/21
to Delta Lake Users and Developers
I wanted to ask if getting random duplicates in downstream tables should be expected or is it something that could be fixed in the future?
I have the same test notebook that I run on same runtime, but in different databricks accounts (in azure) and one every time gives me duplicates while the other never gives duplicates downstream. Any ideas what does that depend on?

Bigger picture:
We have a GDPR scenario when we need to delete data for specific users in delta tables.
We are using ADLS based delta tables, we also replicated that scenario with simple data with dbfs on one databricks account (notebooks attached).

Below details of that scenario:
- There is a source delta table partitioned by day
  |-----------------------------------------------------------|
  |date | userId | uniqueActivityId | data |
  |-----------------------------------------------------------|
- There is a downstream destination delta table partitioned by day
  |-------------------------------------------------------------------------------|
  |date | userId | uniqueActivityId | transformedData |
  |-------------------------------------------------------------------------------|
- There is a streaming job running few times a day getting all new entries from source table, perform transformations and saves in the downstream destination table
- There is also a once a day purge job running (at night when no other job is running on that data) which deletes data for users that wants to be forgotten, data gets cleaned for those users both in the source and destination table.

As we delete (we use merge-delete) data not by partition, we started using ignoreChanges=true flag in our streaming process.

We have observed however that we get a lot of duplicate entries of all others not forgotten users. 

We wanted to replicate this and we see that it replicates when we use adls as storage or dbfs on one databricks account, while on the second one it never results in duplicate entries. 

Details of test scenario and observations:

Databrics1 - DBR 9.0
- Adding 29 entries to the source table
- Stream data to the destination table -> 29 entries in the destination table
- Delete 4 items from  the source table
- Stream data to the destination table -> 29 entries in the destination table
- Metrics of the source table(version operation operationMetrics columns):
version operation operationMetrics
1 MERGE {"numTargetRowsCopied":"0","numTargetRowsDeleted":"4","numTargetFilesAdded":"0","executionTimeMs":"1509","numTargetRowsInserted":"0","scanTimeMs":"949","numTargetRowsUpdated":"0","numOutputRows":"0","numTargetChangeFilesAdded":"0","numSourceRows":"4","numTargetFilesRemoved":"4","rewriteTimeMs":"544"}

0 WRITE {"numFiles":"29","numOutputBytes":"26245","numOutputRows":"29"}

Databrics2 - DBR 9.0
- Adding 29 entries to the source table
- Stream data to the destination table -> 29 entries in the destination table
- Delete 4 items from  the source table
- Stream data to the destination table -> 35 entries in the destination table (duplicate entries)
- Metrics of the source table (version operation operationMetrics columns):
1 MERGE {"numTargetRowsCopied":"6","numTargetRowsDeleted":"4","numTargetFilesAdded":"3","executionTimeMs":"1155","numTargetRowsInserted":"0","scanTimeMs":"567","numTargetRowsUpdated":"0","numOutputRows":"6","numTargetChangeFilesAdded":"0","numSourceRows":"4","numTargetFilesRemoved":"3","rewriteTimeMs":"570"}
0 WRITE {"numFiles":"8","numOutputBytes":"7617","numOutputRows":"29"}

The pattern in our prod env and our example in Databrics2 seems same, we get numTargetRowsCopied = x in metrics of source table and x of duplicated entries in the destination table. 

We have seen a note in delta documentation as downstream may result in duplicates when using ignoreChanges=true, but it's not very specific, and the fact that in one environment we always get in and in the other we never get it is a little strange.
Has anybody had such observations also?
If that situation is unavoidable however, what is the best way to handle unwanted duplicates in partitioned stream?

Attached my test scala notebook using dbfs

GdprSourceStreamingDbfsTest.scala

Shixiong(Ryan) Zhu

unread,
Sep 16, 2021, 9:18:38 PM9/16/21
to Marta Chojnacka, Delta Lake Users and Developers
Hey Marta,

For Delta operations such as UPDATE/DELETE/MERGE, the underlying implementations may need to rewrite existing files. In such cases, they will copy rows from the old files and write to new files. This means new files added to the table may contain the same data from the old files.

If your data has a primary key or unique key, you can use `Dataset.dropDuplicates` to drop them.

Best Regards,

Ryan


--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/8129aed5-ce29-4825-adc7-1682563205e0n%40googlegroups.com.

Marta Chojnacka

unread,
Sep 17, 2021, 5:40:40 AM9/17/21
to Delta Lake Users and Developers

Hi Ryan,

Thank you for your response!
Ok, so I understand that there is no point digging why that would happen in only one of the account not the other (for the very same notebook). There is no setting that we can change to make sure we don't get duplicates.

Thank you for suggesting solution to drop duplicates. 
We really want to avoid an operation like `Dataset.dropDuplicates` cause that almost make all our efforts to have only updates in the stream and power of the datalake futile when we will need to postprocess all the data again.
val allDeduplicatedData = spark.read.format("delta").load(destinationTableName).dropDuplicates(List("key", "value"))
allDeduplicatedData.toDF.format(Path.DELTA).option("overwriteSchema", "true").mode("overwrite").save(destinationTableName)

Only other solution I found might work to deduplicate during streaming was with .foreachBatch
but that's not possible when we partition the data.

Best regards,
Marta

Marta Chojnacka

unread,
Sep 23, 2021, 7:53:10 AM9/23/21
to Delta Lake Users and Developers
Hi again Ryan,

I realized that maybe your comment was about using  dropDuplicates to introduce state in the stream (Stateful Stream Processing in Structured Streaming). 
I will be testing that approach.

Best regards,
Marta

Reply all
Reply to author
Forward
0 new messages