efficient way to maintain latest state of records?

61 views
Skip to first unread message

bo musham

unread,
Feb 11, 2021, 4:12:19 PM2/11/21
to Delta Lake Users and Developers
Hello Delta lakers,

This is my use case. Please, let me know if there is a cost efficient way to do this or if you need further details.

My input is a delta lake, let's call it "RAW", which has raw records (it's source is json files in s3).
From the raw delta lake table, The objective of this job is to maintain the latest state of each record/id. (If the incoming record has a latest timestamp, than the already existing record in the "latest" delta table, update(delete insert) it. If there is no record of this id, insert it).
I currently have appx 320 Million records, around 100GB of data in latest state delta lake table.
Each micro batch of the "latest" state spark app is receiving appx 900 rows, some of them could mean update, some of them could be insert(new records).

If I use 5 *m5a.4xlarge machines, the upsert/merge operation performs under 5mins and produces delta log trasancation as below. If I use 5*m5a.xlarge machine it takes around 90 mins. 5 * 4xlarge machines are very expensive.

Apart from that, as you can see below, for 710 source rows, it removed 377 files, and created 436 files and copied over 185188716 rows and inserted 131 and updates 579 rows. (The upsert scan/join is very expensive in spark and you can see lot of rows copy over and over for a micro batch eacch time)


{"commitInfo":{"timestamp":1613055785246,"operation":"MERGE","operationParameters":{"predicate":"(s.`id` = t.`id`)","updatePredicate":"(s.`updatedts` > t.`updatedts`)"},"readVersion":1948,"isBlindAppend":false,"operationMetrics":{"numTargetRowsCopied":"185188716","numTargetRowsDeleted":"0","numTargetFilesAdded":"436","numTargetRowsInserted":"131","numTargetRowsUpdated":"579","numOutputRows":"185189426","numSourceRows":"710","numTargetFilesRemoved":"377"}}}

But for this latest state maintainer, how can I make it cost efficient? Is delta lake table good choice for this use case? ( you can think have a view/materialized or ctas by group by ranking etc on top of "raw" tablle, but there are lots of users/process which need this "latest state as of their query time", it's very expensive to take that approach, we already do that, so trying to avoid doing that and have the latest state readily available)

If I let this run for a week, I end up having around 150,000 files, but if I generate manifest, there are appx 1000 files only used by delta lake table. VACCUM is an option, but still there are lot of unused files (for 7 days)
If I query this using external table like snowflake or drill, they are not performing well (as they read all filles and then filter out the unused files., even though there is a manifest, it's a separate issue).

Thanks,
Vijay
Reply all
Reply to author
Forward
0 new messages