Kafka Indexing Service error occurs: "Cannot use existing pending segment"

373 views
Skip to first unread message

daiso...@gmail.com

unread,
Aug 17, 2016, 6:46:55 AM8/17/16
to Druid User
We have run Kafka Indexing Service on Druid for nearly a month, and it ran basically well. But a few days ago all the tasks went down. 

Our ingestion specs are in attached file "ingest.json".

The overlord logs are as follows:
2016-08-17T15:14:58,564 WARN [qtp622043416-164] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Cannot use existing pending segment 
[test3_mobileDictClient.android_2016-08-16T21:00:00.000+08:00_2016-08-16T22:00:00.000+08:00_2016-08-16T21:00:00.459+08:00_65] for sequence [index_kafka_test3_mobileDictClient.android_af5c785c81ca0d4_5] 
(previous = [test3_mobileDictClient.android_2016-08-16T20:00:00.000+08:00_2016-08-16T21:00:00.000+08:00_2016-08-16T20:00:00.322+08:00_56] ) in DB, does not match requested interval 
[2016-08-16T20:00:00.000+08:00/2016-08-16T21:00:00.000+08:00]

The peon logs are as follows:
2016-08-17T07:12:05,987 INFO [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_pmlheiab]: SegmentAllocateAction{dataSource='test3_mobileDictClient.android', timestamp=2016-08-15T22:08:36.000+08:00, queryGranularity=DurationGranularity{length=3600000, origin=0}, preferredSegmentGranularity=HOUR, sequenceName='index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_1', previousSegmentId='test3_mobileDictClient.android_2016-08-15T22:00:00.000+08:00_2016-08-15T23:00:00.000+08:00_2016-08-15T23:38:55.238+08:00_160'}
2016-08-17T07:12:05,987 INFO [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_pmlheiab] to overlord[http://hd020.corp.yodao.com:8195/druid/indexer/v1/action]: SegmentAllocateAction{dataSource='test3_mobileDictClient.android', timestamp=2016-08-15T22:08:36.000+08:00, queryGranularity=DurationGranularity{length=3600000, origin=0}, preferredSegmentGranularity=HOUR, sequenceName='index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_1', previousSegmentId='test3_mobileDictClient.android_2016-08-15T22:00:00.000+08:00_2016-08-15T23:00:00.000+08:00_2016-08-15T23:38:55.238+08:00_160'}
2016-08-17T07:12:05,997 WARN [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Cannot allocate segment for timestamp[2016-08-15T22:08:36.000+08:00]
 
* Please note that the peon log and the overlord log do not from at the same run, the time stamp on the log shows they are from different runs. Though I believe they both point to the same problem.

From our debug results, we suspect that the problem is located in line 401 of IndexerSQLMetadataStorageCoordinator.java, which caused the warning log in line 413.
part of the IndexerSQLMetadataStorageCoordinator.java are as follows( line 359-line423):
public SegmentIdentifier allocatePendingSegment(
final String dataSource,
final String sequenceName,
final String previousSegmentId,
final Interval interval,
final String maxVersion
) throws IOException
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");

final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;

return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
{
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final List<byte[]> existingBytes = handle
.createQuery(
String.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.map(ByteArrayMapper.FIRST)
.list();

if (!existingBytes.isEmpty()) {
final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdentifier.class
);

if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull
);

return existingIdentifier;
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull,
interval
);

return null;
}
}

We found that the interval of  existingBytes dose not match the interval. Could it be a logical bug?

We run this query our SQL database:
SELECT * FROM druid_pendingSegments WHERE dataSource ="test3_mobileDictClient.android" AND sequence_name="index_kafka_test3_mobileDictClient.android_af5c785c81ca0d4_5"
And the SQL results are in attached file "sql_query.htm". 

*By the way, our SQL Table druid_pendingSegments  is very large,  we have 48100 row  in druid_pendingSegments  for a single data source, and it stores the pending segments form a month ago. Is it normal or a sign of misconfiguring?


sql_query.htm
ingest.json

David Lim

unread,
Aug 17, 2016, 7:29:24 PM8/17/16
to Druid User
Thank you for the detailed report! Are you seeing this failure all the time now or only sometimes? Can you post the full peon log from startup until the error occurs so we can try to reproduce the issue?

daiso...@gmail.com

unread,
Aug 17, 2016, 11:48:37 PM8/17/16
to Druid User
In our latest run of Kafka Indexing Service (on 8-17), all the tasks were failed. We are not sure if there are any failure before 8-17, though we have run this for a month, and it went seemingly well until 8-16 and 8-17. 

And here is the peon log.

在 2016年8月18日星期四 UTC+8上午7:29:24,David Lim写道:
log.txt

李斯宁

unread,
Aug 18, 2016, 9:34:55 AM8/18/16
to Druid User
I work with daiso... in the same team, and also working on this issue.

After investigating in the code, I found below code may shows the cause of the problem:

/**
* Return a segment usable for "timestamp". May return null if no segment can be allocated.
*
* @param timestamp data timestamp
* @param sequenceName sequenceName for potential segment allocation
*
* @return identifier, or null
*
* @throws IOException if an exception occurs while allocating a segment
*/
private SegmentIdentifier getSegment(final DateTime timestamp, final String sequenceName) throws IOException
{
synchronized (activeSegments) {
final SegmentIdentifier existing =
getActiveSegment(timestamp, sequenceName);
if (existing != null) {
return existing;
}
else {
// Allocate new segment.
final NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = activeSegments.get(sequenceName);
final SegmentIdentifier newSegment = segmentAllocator.allocate(
timestamp,
sequenceName,
lastSegmentIds.get(sequenceName)
);

if (newSegment != null) {
final Long key = newSegment.getInterval().getStartMillis();

for (SegmentIdentifier identifier : appenderator.getSegments()) {
if (identifier.equals(newSegment)) {
throw new ISE(
"WTF?! Allocated segment[%s] which conflicts with existing segment[%s].",
newSegment,
identifier
);
}
}

log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName);
if (activeSegmentsForSequence == null) {
activeSegments.put(sequenceName, Maps.<Long, SegmentIdentifier>newTreeMap());
}
activeSegments.get(sequenceName).put(key, newSegment);
lastSegmentIds.put(sequenceName, newSegment.getIdentifierAsString());
}
else {
// Well, we tried.
log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName);
}

return newSegment;
}
}
}

If we have very late message (for example, in the last day / hour).  Then this late message's interval may be not in activeSegments.
And the 'allocate new segment' seems only allocate intervals in latest interval. So, they won't match.

I guess this is a bug.  If not, how "very late messages" are handled?

thanks for your follow up in advance :)


在 2016年8月17日星期三 UTC+8下午6:46:55,daiso...@gmail.com写道:

David Lim

unread,
Aug 19, 2016, 3:00:52 AM8/19/16
to Druid User
Hey guys,

I've been looking into this issue for a bit but still haven't determined the root cause. To answer your question about late events: late events should definitely be handled by the Kafka indexing task. What is supposed to happen is that if a message comes in and there isn't a segment covering the time interval it belongs to (possibly because it's late and the previous segment covering that range has already been removed from the active segments), allocatePendingSegment() will generate a new segment identifier for a segment with a time interval covering the late event and this event will be put into that segment (while all the other current events will be put into a different segment for the current time interval).

What seems to be happening in your case is that when the task goes to generate a new segment identifier for the late event, it fails because another segment identifier has already been created for the same sequenceName/previousSegmentId - which is the unique key used to generate a segment identifier which is necessarily deterministic so that replica tasks create segments with the same ID - but the segment identifier that was already created was for a different time interval which would mean a loss of determinism in how segments are created.

This, for example, could happen if the same Kafka messages were being read by two tasks but they were reading them in different order, but Kafka messages should be ordered within a partition so hopefully this is not what's happening.

Question for you: were you running this task with more than one replica? I ask because the ingest.json you posted shows only a single task (no replication), but this block of code would normally be hit by replica tasks. If so, could you also post the peon log of all the other replica tasks that were running when this happens? If you're able to provide the full overlord logs that would be helpful too. If you weren't actually running replica tasks, let us know because that'll also help narrow down what's going on.

The expert at this section of code is currently on holidays so we might need to wait until he gets back if we still can't figure out what's going on after looking at the rest of the logs.

李斯宁

unread,
Aug 20, 2016, 11:49:12 AM8/20/16
to Druid User
Hi david,
Thanks very much for detailed answer for our question.

I agree that the failure may be related to concurrent running tasks (may be by some accidently killing middle manager or other reason)
I dig to more failed log, and found that we have mis-configured "druid.selectors.coordinator.serviceName" for middele manager. So it cannot find coordinator and reports in some peon's log, and may triggered our problem.

After we correctly set  "druid.selectors.coordinator.serviceName", it seems no *real error* occurs.

BTW: we may have found a bug on kafka ingest task's status: https://github.com/druid-io/druid/issues/3374

Thanks again for your kindly reply.

Thanks 

在 2016年8月19日星期五 UTC+8下午3:00:52,David Lim写道:

David Lim

unread,
Aug 22, 2016, 1:49:49 PM8/22/16
to Druid User
Thank you for the update and for filing the issue. Let us know how things go and if this behavior comes up again.

李斯宁

unread,
Aug 24, 2016, 2:43:37 AM8/24/16
to Druid User
Hi david,
If one task process multiple partitions, can kafka-index assure deterministic?
The order between different partitions are not deterministic, so may this be an issue?

在 2016年8月23日星期二 UTC+8上午1:49:49,David Lim写道:

David Lim

unread,
Aug 24, 2016, 1:26:43 PM8/24/16
to Druid User
Yes, ingestion will be deterministic even if there are multiple partitions being ingested by a task. This is handled by each task writing events from different partitions into different segments so that each segment is deterministic (since as you mentioned, Kafka events are ordered within a partition but are not ordered across partitions).

The tradeoff to this is that if you have a large number of Kafka partitions, you will get a large number of segments which may be sub-optimal for query performance and may need to be merged together to get optimally-sized segments.

李斯宁

unread,
Aug 24, 2016, 10:51:00 PM8/24/16
to Druid User
hi david, thanks for the explanation!
We'll look deeper into it if we see this problem again.

在 2016年8月25日星期四 UTC+8上午1:26:43,David Lim写道:
Reply all
Reply to author
Forward
0 new messages