concurrent append lock will condense the lock interval

43 views
Skip to first unread message

Vicent Taylor

unread,
Jun 8, 2024, 8:22:53 AMJun 8
to Druid User
I have a datasource pipeline with day granularity. I have a model which will continue to submit ingestion tasks to append new data to the datasource. In the meantime, the compaction task will compact the datasource.
The ingestion task is using append lock, while the compaction task is using replace lock.

I expect everything to work fine, but I find compaction task fail frequently.
After some investigation, I found it is caused by my ingested data has out of order. E.g. if the ingestion task contains 5/1 and 5/2 data, the ingestion task will acquire append lock on interval 0501/0503. When compaction task acquire replace lock on 0501/0502, it will be revoked.

Apparently, when the ingestion task acquire append lock, it will condense multiple consecutive intervals into a single interval. Is there any way to disable the condense behavior? It is definitely not what I want for the append lock.

John Kowtko

unread,
Jun 9, 2024, 9:22:27 AMJun 9
to Druid User
Hi Dengyo,

I didn't know that the locks are "expanded", I thought the task just acquires more locks, i.e. 0501/0502 and 0502/0503.

but regardless of how the locks are acquired or held, the ingestion task will release the locks when the task exits.    Streaming tasks "roll over", that is the current task is replaced by a new task, every taskDuration.  Assuming the new task doesn't receive any data for the older day, that day will be able to be compacted.

This is just standard behavior.  the older data we call "late arrival" data.  It is common for compaction to "struggle" to compact intervals when there is lots of late arrival data

If you are suffering from serious fragmentation problems due to the late arrival data, there is a new feature (https://druid.apache.org/docs/latest/ingestion/concurrent-append-replace) that will allow you to run compaction over ingest-locked intervals, that you could experiment with.  The only caveat here is that the new needs-based scheduler has not been built yet, so you will have to manually direct compaction to work on the most appropriate time intervals, otherwise it will probably just compact the latest interval over and over again.

I hope this adds some clarification.  Let us know if you have any followup questions on this.

Thanks.  John

Vicent Taylor

unread,
Jun 9, 2024, 11:30:03 AMJun 9
to druid...@googlegroups.com
Hi John,

Thanks very much for your response. Actually, I am using concurrent append/replace locks in the append ingestion task and the compaction task in my testing.
In my real case, it has more serious late event, which can be 40 days longer.
Before, for this case, the append ingestion task is using segment lock, and the compaction task is also using segment lock. The compaction tasks are scheduled to run twice a day. The compaction task can run successfully even when the append ingestion task is running, so the segment number introduced by the late event can be reduced by the compaction task.

Recently, we found some drawbacks of using segment lock to overwrite.
  1. the segment lock cost a lot of time when the interval contains lots of segment (1000+ in my case)
  2. the segment lock to compact will fail if the segment partition number is not consecutive (usually caused by append ingestion task failure)
We are expecting concurrent lock to solve our case. It is a fantastic feature.

So my question is: Is there any way to disable the condense behavior when append lock is used? You can still condense when replace lock is used. It is acceptable that we add an option in the task context to enforce this behavior.

Wish to get your option soon.

Thanks.
'John Kowtko' via Druid User <druid...@googlegroups.com> 于2024年6月9日周日 21:22写道:
--
You received this message because you are subscribed to a topic in the Google Groups "Druid User" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-user/HkjRmLdzPFM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/a0dc2981-3f01-427f-a6a1-298722e3db23n%40googlegroups.com.
Message has been deleted

John Kowtko

unread,
Jun 9, 2024, 1:19:38 PMJun 9
to Druid User
Hi, okay you should not be using segment locks at all ... to my knowledge those were never fully implemented, were problematic, and were replaced compleletely by Concurrent Append/Replace locking.

Thanks.  John

Vicent Taylor

unread,
Jun 9, 2024, 11:35:48 PMJun 9
to druid...@googlegroups.com
Hi John,

We are upgrading druid and will use concurrent append/replace lock.
So We will change default append lock behavior not to condense interval, or add a task context option to disable the condense behavior?

Thanks.

'John Kowtko' via Druid User <druid...@googlegroups.com> 于2024年6月10日周一 01:19写道:

Vicent Taylor

unread,
Jun 10, 2024, 10:46:42 PMJun 10
to druid...@googlegroups.com

Hi John,

This is our simple turnaround fix on druid 29.0.1.

Index: indexing-service-patch/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/indexing-service-patch/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service-patch/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
--- a/indexing-service-patch/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java (revision 294469)
+++ b/indexing-service-patch/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java (revision 294483)
@@ -100,6 +100,8 @@
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import java.util.Spliterators;
 
 /**
  * Abstract class for batch tasks like {@link IndexTask}.
@@ -427,7 +429,7 @@
         // when an overwriting task finds a version for a given input row, it expects the interval
         // associated to each version to be equal or larger than the time bucket where the input row falls in.
         // See ParallelIndexSupervisorTask.findVersion().
-        final Iterator<Interval> intervalIterator;
+        Iterator<Interval> intervalIterator;
         final Granularity segmentGranularity = getSegmentGranularity();
         if (segmentGranularity == null) {
             intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
@@ -435,6 +437,15 @@
             IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
             // the following is calling a condense that does not materialize the intervals:
             intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
+            TaskLockType taskLockType = determineLockType(LockGranularity.TIME_CHUNK);
+            if (TaskLockType.APPEND == taskLockType) {
+                intervalIterator = StreamSupport.stream(
+                        Spliterators.spliteratorUnknownSize(intervalIterator, 0),
+                        false
+                ).flatMap(interval ->
+                        StreamSupport.stream(Spliterators.spliteratorUnknownSize(segmentGranularity.getIterable(interval).iterator(), 0), false)
+                ).iterator();
+            }
         }
 
         // Intervals are already condensed to avoid creating too many locks.
Reply all
Reply to author
Forward
0 new messages