How Nessie determines if there is a conflict commit

183 views
Skip to first unread message

Canope Nerda

unread,
Oct 12, 2022, 7:46:12 AM10/12/22
to projectnessie
Hi folks,

I have a flink stream job writing new data to a partitioned iceberg table. In the background, I'm running a spark procedure to rewrite data files. However, I saw a lot of warnings below in the log of the compacting job. Once in production, I may need to do compaction to a table with several concurrent spark procedures, and I'm worried about the conflicts in the warning, and would like to know more under the hood.

22/10/12 11:30:26 WARN Tasks: Retrying task after failure: Cannot commit: Reference hash is out of date. Update the reference 'dev' and try again
org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: Reference hash is out of date. Update the reference 'dev' and try again
        at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:196)
        at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:133)
        at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:317)
        at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
        at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295)
        at org.apache.iceberg.actions.RewriteDataFilesCommitManager.commitFileGroups(RewriteDataFilesCommitManager.java:89)
        at org.apache.iceberg.actions.RewriteDataFilesCommitManager.commitOrClean(RewriteDataFilesCommitManager.java:110)
        at org.apache.iceberg.actions.RewriteDataFilesCommitManager$CommitService.lambda$start$0(RewriteDataFilesCommitManager.java:179)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.projectnessie.error.NessieReferenceConflictException: Key 'datafinger.flash_orderbooklv2' has conflicting put-operation from commit 'd414c5f96713526addb322065b4ca58c2507f0a9fa1ea3151f978bfbad707898'.
        at org.projectnessie.error.ErrorCode.lambda$asException$1(ErrorCode.java:60)
        at java.base/java.util.Optional.map(Unknown Source)
        at org.projectnessie.error.ErrorCode.asException(ErrorCode.java:60)
        at org.projectnessie.client.rest.ResponseCheckFilter.checkResponse(ResponseCheckFilter.java:56)
        at org.projectnessie.client.rest.NessieHttpResponseFilter.filter(NessieHttpResponseFilter.java:34)
        at org.projectnessie.client.http.HttpRequest.lambda$executeRequest$3(HttpRequest.java:157)
        at java.base/java.util.ArrayList.forEach(Unknown Source)
        at org.projectnessie.client.http.HttpRequest.executeRequest(HttpRequest.java:157)
        at org.projectnessie.client.http.HttpRequest.post(HttpRequest.java:196)
        at org.projectnessie.client.http.HttpTreeClient.commitMultipleOperations(HttpTreeClient.java:188)
        at jdk.internal.reflect.GeneratedMethodAccessor339.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.projectnessie.client.http.NessieHttpClient$ExceptionRewriter.invoke(NessieHttpClient.java:161)
        at com.sun.proxy.$Proxy50.commitMultipleOperations(Unknown Source)
        at org.projectnessie.client.http.v1api.HttpCommitMultipleOperations.commit(HttpCommitMultipleOperations.java:58)
        at org.apache.iceberg.nessie.NessieTableOperations.doCommit(NessieTableOperations.java:188)
        ... 13 more

Regards,
canopenerda

Ajantha Bhat

unread,
Oct 12, 2022, 9:29:50 AM10/12/22
to Canope Nerda, projectnessie
Hi Canopenerda,

At Nessie, conflict detection is table-level on a given branch.
So, the concurrent commits for the same table on the same branch is resulting in conflicts.
In future, we have a plan to improve conflict detection logic (by considering operation type or knowing what exactly happened in the commits)

For now, I want to know how you are running the compaction.
I assume you have configured "max-concurrent-file-group-rewrites" to speed up compaction (instead of processing one file group at a time).
I also assume you have configured "partial-progress.enabled" to commit file group by file group instead of single compaction commit?

Thanks,
Ajantha



--
You received this message because you are subscribed to the Google Groups "projectnessie" group.
To unsubscribe from this group and stop receiving emails from it, send an email to projectnessi...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/projectnessie/CAAQ4uE8i%2BFNB%2BUEug%3DwRtgfsEK1y2Qj8rdxe8eG-OfGXBC9OFw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Canope Nerda

unread,
Oct 12, 2022, 10:57:35 AM10/12/22
to Ajantha Bhat, projectnessie
Hi Ajantha,

I'm passing below options to the spark procedure. The interesting part is that it seems the Flink streaming job is not impacted which commits every 2 mins. In production, there will be several Flink jobs writing against the same table but to exclusive partitions along with corresponding spark compacting jobs.

options => map('max-concurrent-file-group-rewrites', '100', 'min-input-files', '2', 'partial-progress.enabled', 'true', 'partial-progress.max-commits', '500')

Regards,
canopenerda

Ajantha Bhat

unread,
Oct 13, 2022, 12:15:16 AM10/13/22
to Canope Nerda, projectnessie
a) Thanks for sharing your configurations. So, it is what I assumed. 

I want you to disable "partial-progress.enabled".

With this, compaction should still be able to process file groups concurrently (as we still have "
max-concurrent-file-group-rewrites" config).
Just that It will commit all file groups in a single atomic commit (which won't cause conflict in Nessie).
Yeah, we will be losing the advantage of 
partial-progress.enabled (it was useful only for error scenarios to commit successful filegroups).
We just have to live with it until Nessie does better conflict resolution. 

b) 
The interesting part is that it seems the Flink streaming job is not impacted which commits every 2 mins. In production, there will be several Flink jobs writing against the same table but to exclusive partitions 
Can you check whether it makes a single commit or multiple commits? If it makes multiple Iceberg commits concurrently. It can lead to conflict in Nessie. 

c) 
along with corresponding spark compacting jobs.
For each partition, if you launch one spark procedure compaction job (with where filter). It will still be considered as concurrent commits and it can lead to conflicts. 

Thanks,
Ajantha

Ajantha Bhat

unread,
Oct 13, 2022, 12:26:30 AM10/13/22
to Canope Nerda, projectnessie
Also, we have documented current conflict resolution logic. 
it can be found at https://projectnessie.org/develop/spec/#conflict-resolution

Canope Nerda

unread,
Oct 13, 2022, 12:34:18 AM10/13/22
to Ajantha Bhat, projectnessie
Thanks Ajantha! Please find my replies inline below.

Regards,
canopenerda


On Thu, Oct 13, 2022 at 12:15 PM Ajantha Bhat <ajant...@gmail.com> wrote:
a) Thanks for sharing your configurations. So, it is what I assumed. 

I want you to disable "partial-progress.enabled".

With this, compaction should still be able to process file groups concurrently (as we still have "
max-concurrent-file-group-rewrites" config).
Just that It will commit all file groups in a single atomic commit (which won't cause conflict in Nessie).
Yeah, we will be losing the advantage of 
partial-progress.enabled (it was useful only for error scenarios to commit successful filegroups).
We just have to live with it until Nessie does better conflict resolution. 
[C]: Initially I ran the procedure with partial-progress disabled, then I encountered the driver OOMKilled issue, I thought partial commit may alleviate pressure on memory.

b) 
The interesting part is that it seems the Flink streaming job is not impacted which commits every 2 mins. In production, there will be several Flink jobs writing against the same table but to exclusive partitions 
Can you check whether it makes a single commit or multiple commits? If it makes multiple Iceberg commits concurrently. It can lead to conflict in Nessie. 
[C]: As of now I only have one Flink job and one Spark procedure running concurrently, each operating against the same branch of the same table and the Flink job commits sequentially. What interested me is that it seems the Flink job is not interfered by the Spark procedure. 

c) 
along with corresponding spark compacting jobs.
For each partition, if you launch one spark procedure compaction job (with where filter). It will still be considered as concurrent commits and it can lead to conflicts. 
[C]: If I create a separate branch for each Flink/Spark job pair, it would not have conflicts among the pairs, right?

Ajantha Bhat

unread,
Oct 13, 2022, 1:02:00 AM10/13/22
to Canope Nerda, projectnessie
What interested me is that it seems the Flink job is not interfered by the Spark procedure. 
It depends on whether both are committing at a time or not. 


If I create a separate branch for each Flink/Spark job pair, it would not have conflicts among the pairs, right?
Separate branch commits will not lead to conflicts.
But when you merge all branches into the main branch. You will get into a conflict. Only one branch merge will be a success in your scenario. 

Canope Nerda

unread,
Oct 13, 2022, 1:12:40 AM10/13/22
to Ajantha Bhat, projectnessie
Really appreciate your help. Please check my questions inline. Thanks!

Regards,
canopenerda


On Thu, Oct 13, 2022 at 1:02 PM Ajantha Bhat <ajant...@gmail.com> wrote:
What interested me is that it seems the Flink job is not interfered by the Spark procedure. 
It depends on whether both are committing at a time or not. 
[C]: We could assume the Flink job is always on and the Spark job is periodic, and there would be overlap when both were running. My assumption is they should interfere with each other, but from what I observed, only the Spark job throws exceptions.

If I create a separate branch for each Flink/Spark job pair, it would not have conflicts among the pairs, right?
Separate branch commits will not lead to conflicts.
But when you merge all branches into the main branch. You will get into a conflict. Only one branch merge will be a success in your scenario. 
[C]: So there will be conflict even if I merge the branches one by one to the main and data on the branches are exclusive per se?

Ajantha Bhat

unread,
Oct 13, 2022, 2:15:15 AM10/13/22
to Canope Nerda, projectnessie
[C]: So there will be conflict even if I merge the branches one by one to the main and data on the branches are exclusive per se?

Yes. Because all branches have been created from the same state (x)
and when each branch has a new commit on the same table, Only one branch can be merged successfully. 
Other branches merging will fail as the base state of the table is changed. (Yes even in one-by-one branch merge it will fail)


Thanks,
Ajantha


Sample test case for reference.
sql("USE REFERENCE main IN nessie");
sql("CREATE TABLE nessie.db.tbl1 (id int) using iceberg");
sql("INSERT INTO nessie.db.tbl1 select 42");

sql("CREATE BRANCH dev IN nessie FROM main");
sql("CREATE BRANCH test IN nessie FROM main");

sql("USE REFERENCE dev IN nessie");
sql("INSERT INTO nessie.db.tbl1 select 43");

sql("USE REFERENCE test IN nessie");
sql("INSERT INTO nessie.db.tbl1 select 44");

sql("USE REFERENCE main IN nessie");
sql("Merge branch dev INTO main in nessie");
sql("Merge branch test INTO main in nessie"); -- This merge will fail

Canope Nerda

unread,
Oct 13, 2022, 2:41:28 AM10/13/22
to Ajantha Bhat, projectnessie
Then the current model doesn't work well with concurrent workloads against the same table, even if the workloads spread over branches. One workaround I can think of is to split the table into more scoped ones and create a view to provide unified access, which need view catalog capability from the underlying catalog. Does Nessie support view catalog?

Regards,
canopenerda

Ajantha Bhat

unread,
Oct 13, 2022, 2:54:12 AM10/13/22
to Canope Nerda, projectnessie
Hey,

I would like to summarise things now. I got mixed up with merge conflict and concurrent commit conflict.

a) Concurrent commit conflict [on the same table on the same branch]:
As mentioned, Conflict detection is table level. Concurrent commit does lead to conflict at Nessie. Hence that warning message.
But Iceberg will retry the commit (just metadata operation) again and the retry should succeed as the retry will rebase with a new reference state. 

That is why your flink/spark concurrent jobs and even compaction is getting succeeded. 
I just didn't consider Iceberg retry and was focused just on Nessie backed. Hope it is clear now.
No need to change your compaction settings. It is just a warning. 
Nessie does handle Iceberg concurrent commits on a given branch (similar to other catalogs).

b) Merge Conflict 
As mentioned, Conflict detection is table level.
So, when a branch is merged with another branch. If any of the table's states in the source branch is changed, it leads to merge conflict. 
Here, There is no automatic retry with rebase. The user has to manually rework. 
The above testcase and explanation from previous emails still hold good for merge. 

Thanks,
Ajantha

Canope Nerda

unread,
Oct 13, 2022, 3:06:11 AM10/13/22
to Ajantha Bhat, projectnessie
Sorry for mixing two types of conflicts. So if the data produced by the concurrent jobs need to be in the same table in the end, it should be produced against the same branch. In the real case, I would have 20+ Flink jobs and 20+ periodic Spark jobs writing against the same table, and I can imagine the chance of conflicts is huge. Do you have any suggestions? Thank you!

Regards,
canopenerda

Ajantha Bhat

unread,
Oct 13, 2022, 3:21:43 AM10/13/22
to Canope Nerda, projectnessie
So if the data produced by the concurrent jobs need to be in the same table in the end, it should be produced against the same branch
 Correct. (As if you split across the branches, later while merging the branch to a single branch you will get conflict and need manual rework as explained in the example) 


I would have 20+ Flink jobs and 20+ periodic Spark jobs writing against the same table, and I can imagine the chance of conflicts is huge. Do you have any suggestions?
Yes. This is about concurrent commit conflict. Iceberg will retry and all operations should eventually succeed (because this is just an append operation).
 Might have to tune "commit.retry.num-retries"  if you find any permanent failure of operations.

Canope Nerda

unread,
Oct 13, 2022, 4:48:37 AM10/13/22
to Ajantha Bhat, projectnessie
Hi Ajantha,

Would you please kindly elaborate on the manual rework to resolve conflicts when merging changes from several branches to a common branch?
Regarding the concurrent commit conflict, do we have a plan in the near future to lower the conflict detection level to partition for partitioned tables?

Regards,
canopenerda

Ajantha Bhat

unread,
Oct 13, 2022, 10:16:13 AM10/13/22
to Canope Nerda, projectnessie
Regarding the concurrent commit conflict, do we have a plan in the near future to lower the conflict detection level to partition for partitioned tables?
No. This logic is the same as Iceberg conflict detection for other catalogs too (if table metadata is modified, rebase and retry). 
So, other catalogs also have this conflict and they retry.


Would you please kindly elaborate on the manual rework to resolve conflicts when merging changes from several branches to a common branch?
As of now, We have to create a new branch from the latest main branch (or assign main to current branches again).
This leads to a loss of operations that happened on those branches and we need to redo the entire operations again.
There is no stash() and pop() like git as pop() needs table metadata modification.

So, the current implementation is not convenient once we have a conflict. 
As mentioned we have a plan to implement content-aware merges (which can help in finer merge conflict detection and metadata modification).
It is on the roadmap(https://github.com/projectnessie/nessie/issues/2513). 

Thanks,
Ajantha


Reply all
Reply to author
Forward
0 new messages