We have run Kafka Indexing Service on Druid for nearly a month, and it ran basically well. But a few days ago all the tasks went down.
Our ingestion specs are in attached file "ingest.json".
The overlord logs are as follows:
2016-08-17T15:14:58,564 WARN [qtp622043416-164] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Cannot use existing pending segment
[test3_mobileDictClient.android_2016-08-16T21:00:00.000+08:00_2016-08-16T22:00:00.000+08:00_2016-08-16T21:00:00.459+08:00_65] for sequence [index_kafka_test3_mobileDictClient.android_af5c785c81ca0d4_5]
(previous = [test3_mobileDictClient.android_2016-08-16T20:00:00.000+08:00_2016-08-16T21:00:00.000+08:00_2016-08-16T20:00:00.322+08:00_56] ) in DB, does not match requested interval
[2016-08-16T20:00:00.000+08:00/2016-08-16T21:00:00.000+08:00]
The peon logs are as follows:
2016-08-17T07:12:05,987 INFO [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_pmlheiab]: SegmentAllocateAction{dataSource='test3_mobileDictClient.android', timestamp=2016-08-15T22:08:36.000+08:00, queryGranularity=DurationGranularity{length=3600000, origin=0}, preferredSegmentGranularity=HOUR, sequenceName='index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_1', previousSegmentId='test3_mobileDictClient.android_2016-08-15T22:00:00.000+08:00_2016-08-15T23:00:00.000+08:00_2016-08-15T23:38:55.238+08:00_160'}
2016-08-17T07:12:05,987 INFO [task-runner-0-priority-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_pmlheiab] to overlord[http://hd020.corp.yodao.com:8195/druid/indexer/v1/action]: SegmentAllocateAction{dataSource='test3_mobileDictClient.android', timestamp=2016-08-15T22:08:36.000+08:00, queryGranularity=DurationGranularity{length=3600000, origin=0}, preferredSegmentGranularity=HOUR, sequenceName='index_kafka_test3_mobileDictClient.android_ee40ef7fb2836ce_1', previousSegmentId='test3_mobileDictClient.android_2016-08-15T22:00:00.000+08:00_2016-08-15T23:00:00.000+08:00_2016-08-15T23:38:55.238+08:00_160'}
2016-08-17T07:12:05,997 WARN [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Cannot allocate segment for timestamp[2016-08-15T22:08:36.000+08:00]
* Please note that the peon log and the overlord log do not from at the same run, the time stamp on the log shows they are from different runs. Though I believe they both point to the same problem.
From our debug results, we suspect that the problem is located in line 401 of IndexerSQLMetadataStorageCoordinator.java, which caused the warning log in line 413.
part of the IndexerSQLMetadataStorageCoordinator.java are as follows( line 359-line423):
public SegmentIdentifier allocatePendingSegment(
final String dataSource,
final String sequenceName,
final String previousSegmentId,
final Interval interval,
final String maxVersion
) throws IOException
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
{
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final List<byte[]> existingBytes = handle
.createQuery(
String.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.map(ByteArrayMapper.FIRST)
.list();
if (!existingBytes.isEmpty()) {
final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdentifier.class
);
if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull
);
return existingIdentifier;
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull,
interval
);
return null;
}
}
We found that the interval of existingBytes dose not match the interval. Could it be a logical bug?
We run this query our SQL database:
SELECT * FROM druid_pendingSegments WHERE dataSource ="test3_mobileDictClient.android" AND sequence_name="index_kafka_test3_mobileDictClient.android_af5c785c81ca0d4_5"
And the SQL results are in attached file "sql_query.htm".
*By the way, our SQL Table druid_pendingSegments is very large, we have 48100 row in druid_pendingSegments for a single data source, and it stores the pending segments form a month ago. Is it normal or a sign of misconfiguring?