Problem publishing segments for outdated messages. Kafka Ingestion.

1,644 views
Skip to first unread message

epi...@powerspace.com

unread,
May 13, 2018, 8:27:00 AM5/13/18
to Druid User
Hi everyone,

This is a tricky one and I hope you can help me.

Context: Druid 0.12.0, Kafka Ingestion real-time (I'll link the spec below).
Problem: Everything well so far, then suddenly ingestion tasks for a single specific datasource (we got two) start failing in loop (we got a replica but it fails too). 

This is the task log:

2018-05-13T11:05:09,724 WARN [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Cannot allocate segment for timestamp[2018-05-12T23:58:55.986Z], sequenceName[index_kafka_analytics_sessions_75dc73c7d7c481e_0].
[...]
2018-05-13T11:05:10,549 INFO [analytics_sessions-incremental-persist] io.druid.segment.IndexMergerV9 - Completed index.drd in 4 millis.
2018-05-13T11:05:10,562 INFO [analytics_sessions-incremental-persist] io.druid.java.util.common.io.smoosh.FileSmoosher - Created smoosh file [/opt/druid/var/druid/task/index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb/work/persist/analytics_sessions_2018-05-13T00:00:00.000Z_2018-05-14T00:00:00.000Z_2018-05-13T00:30:20.766Z_905/0/00000.smoosh] of size [924266] bytes.
2018-05-13T11:05:10,634 INFO [analytics_sessions-incremental-persist] io.druid.segment.realtime.appenderator.AppenderatorImpl - Committing metadata[AppenderatorDriverMetadata{segments={index_kafka_analytics_sessions_75dc73c7d7c481e_0=[io.druid.segment.realtime.appenderator.SegmentWithState@feb921]}, lastSegmentIds={index_kafka_analytics_sessions_75dc73c7d7c481e_0=analytics_sessions_2018-05-13T00:00:00.000Z_2018-05-14T00:00:00.000Z_2018-05-13T00:30:20.766Z_905}, callerMetadata={nextPartitions=KafkaPartitions{topic='sessions', partitionOffsetMap={1=6467232, 4=618876, 7=618838, 10=618560, 13=618535, 16=618805, 19=618830}}}}] for sinks[analytics_sessions_2018-05-13T00:00:00.000Z_2018-05-14T00:00:00.000Z_2018-05-13T00:30:20.766Z_905:1].
2018-05-13T11:05:10,651 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.StreamAppenderatorDriver - Persisted pending data in 925ms.
2018-05-13T11:05:10,654 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down...
2018-05-13T11:05:10,687 INFO [appenderator_persist_0] io.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing segment[analytics_sessions_2018-05-13T00:00:00.000Z_2018-05-14T00:00:00.000Z_2018-05-13T00:30:20.766Z_905] at path[/druid/segments/druid-middlemanager-01.c....internal:8101/druid-middlemanager-01.c..internal:8101_indexer-executor__default_tier_2018-05-13T11:05:08.993Z_be368fbfb2584ddca017f48849fedbd20]
2018-05-13T11:05:10,709 INFO [appenderator_persist_0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Removing sink for segment[analytics_sessions_2018-05-13T00:00:00.000Z_2018-05-14T00:00:00.000Z_2018-05-13T00:30:20.766Z_905].
2018-05-13T11:05:10,716 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Unregistering chat handler[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb]
2018-05-13T11:05:10,716 WARN [publish-driver] io.druid.indexing.kafka.KafkaIndexTask - Stopping publish thread as we are interrupted, probably we are shutting down
2018-05-13T11:05:10,717 INFO [task-runner-0-priority-0] io.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannouncing [DiscoveryDruidNode{druidNode=DruidNode{serviceName='druid/middleManager', host='...', port=-1, plaintextPort=8101, enablePlaintextPort=true, tlsPort=-1, enableTlsPort=false}, nodeType='peon', services={dataNodeService=DataNodeService{tier='_default_tier', maxSize=0, type=indexer-executor, priority=0}, lookupNodeService=LookupNodeService{lookupTier='__default'}}}].
2018-05-13T11:05:10,717 INFO [task-runner-0-priority-0] io.druid.curator.announcement.Announcer - unannouncing [/druid/internal-discovery/peon/druid-middlemanager-01.c....5.internal:8101]
2018-05-13T11:05:10,726 INFO [task-runner-0-priority-0] io.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced [DiscoveryDruidNode{druidNode=DruidNode{serviceName='druid/middleManager', host='...host', port=-1, plaintextPort=8101, enablePlaintextPort=true, tlsPort=-1, enableTlsPort=false}, nodeType='peon', services={dataNodeService=DataNodeService{tier='_default_tier', maxSize=0, type=indexer-executor, priority=0}, lookupNodeService=LookupNodeService{lookupTier='__default'}}}].
2018-05-13T11:05:10,726 INFO [task-runner-0-priority-0] io.druid.server.coordination.CuratorDataSegmentServerAnnouncer - Unannouncing self[DruidServerMetadata{name='...', hostAndPort='...', hostAndTlsPort='null', maxSize=0, tier='_default_tier', type=indexer-executor, priority=0}] at [/druid/announcements/druid-middlemanager-01.c.....internal:8101]
2018-05-13T11:05:10,726 INFO [task-runner-0-priority-0] io.druid.curator.announcement.Announcer - unannouncing [...]
2018-05-13T11:05:10,736 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb, type=index_kafka, dataSource=analytics_sessions}]
io
.druid.java.util.common.ISE: Could not allocate segment for row with timestamp[2018-05-12T23:58:55.986Z]
 at io
.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:731) ~[?:?]
 at io
.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:444) [druid-indexing-service-0.12.0.jar:0.12.0]
 at io
.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:416) [druid-indexing-service-0.12.0.jar:0.12.0]
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_152]
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_152]
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_152]
 at java
.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
2018-05-13T11:05:10,749 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] status changed to [FAILED].
2018-05-13T11:05:10,754 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
 
"id" : "index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb",
 
"status" : "FAILED",
 
"duration" : 3249
}

This is the overlord/coordinator log:

2018-05-13T11:05:09,700 WARN [qtp1154895182-140] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Cannot allocate new segment for dataSource[analytics_sessions], interval[2018-05-12T00:00:00.000Z/2018-05-13T00:00:00.000Z], maxVersion[2018-05-13T11:05:09.649Z]: ShardSpec class[class io.druid.timeline.partition.NoneShardSpec] used by [analytics_sessions_2018-05-12T00:00:00.000Z_2018-05-13T00:00:00.000Z_2018-05-13T03:29:12.939Z].
2018-05-13T11:05:09,702 ERROR [qtp1154895182-140] io.druid.indexing.common.actions.SegmentAllocateAction - Could not allocate pending segment for rowInterval[2018-05-12T23:00:00.000Z/2018-05-13T00:00:00.000Z], segmentInterval[2018-05-12T00:00:00.000Z/2018-05-13T00:00:00.000Z].flatreco
2018-05-13T11:05:10,720 INFO [NodeTypeWatcher[peon]] io.druid.curator.discovery.CuratorDruidNodeDiscoveryProvider$NodeTypeWatcher - Node[peon:DiscoveryDruidNode{druidNode=DruidNode{serviceName='druid/middleManager', host='...', port=-1, plaintextPort=8101, enablePlaintextPort=true, tlsPort=-1, enableTlsPort=false}, nodeType='peon', services={dataNodeService=DataNodeService{tier='_default_tier', maxSize=0, type=indexer-executor, priority=0}, lookupNodeService=LookupNodeService{lookupTier='__default'}}}] disappeared.
2018-05-13T11:05:10,730 INFO [ServerInventoryView-0] io.druid.client.BatchServerInventoryView - Server Disappeared[DruidServerMetadata{name='...', hostAndPort='...', hostAndTlsPort='null', maxSize=0, tier='_default_tier', type=indexer-executor, priority=0}]
2018-05-13T11:05:11,302 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[druid-middlemanager-01.c.....internal:8091] wrote FAILED status for task [index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] on [TaskLocation{host='...', port=8101, tlsPort=-1}]
2018-05-13T11:05:11,302 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[druid-middlemanager-01.c....nternal:8091] completed task[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] with status[FAILED]
2018-05-13T11:05:11,302 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb
2018-05-13T11:05:11,302 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Cleaning up task[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] on worker[druid-middlemanager-01.c....:8091]
2018-05-13T11:05:11,306 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] from activeTasks
2018-05-13T11:05:11,306 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] from TaskLock[index_kafka_analytics_sessions]
2018-05-13T11:05:11,306 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - TaskLock is now empty: TaskLock{type=EXCLUSIVE, groupId=index_kafka_analytics_sessions, dataSource=analytics_sessions, interval=2018-05-12T00:00:00.000Z/2018-05-13T00:00:00.000Z, version=2018-05-13T11:05:09.649Z, priority=75, revoked=false}
2018-05-13T11:05:11,308 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.MetadataTaskStorage - Deleting TaskLock with id[37876]: TaskLock{type=EXCLUSIVE, groupId=index_kafka_analytics_sessions, dataSource=analytics_sessions, interval=2018-05-12T00:00:00.000Z/2018-05-13T00:00:00.000Z, version=2018-05-13T11:05:09.649Z, priority=75, revoked=false}
2018-05-13T11:05:11,312 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - Removing task[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] from TaskLock[index_kafka_analytics_sessions]
2018-05-13T11:05:11,314 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.MetadataTaskStorage - Deleting TaskLock with id[37875]: TaskLock{type=EXCLUSIVE, groupId=index_kafka_analytics_sessions, dataSource=analytics_sessions, interval=2018-05-13T00:00:00.000Z/2018-05-14T00:00:00.000Z, version=2018-05-13T09:28:13.796Z, priority=75, revoked=false}
2018-05-13T11:05:11,319 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb to status: TaskStatus{id=index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb, status=FAILED, duration=12803
2018-05-13T11:05:11,323 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskQueue - Task done: KafkaIndexTask{id=index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb, type=index_kafka, dataSource=analytics_sessions}
2018-05-13T11:05:11,324 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskQueue - Task FAILED: KafkaIndexTask{id=index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb, type=index_kafka, dataSource=analytics_sessions} (12803 run duration)
2018-05-13T11:05:11,324 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] status changed to [FAILED].
2018-05-13T11:05:11,324 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Task[index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] went bye bye.
[...]
2018-05-13T11:05:14,264 ERROR [KafkaIndexTaskClient-analytics_sessions-3] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Problem while getting checkpoints for task [index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb], killing the task
2018-05-13T11:05:14,269 ERROR [KafkaSupervisor-analytics_sessions] io.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[analytics_sessions] failed to handle notice: {class=io.druid.indexing.kafka.supervisor.KafkaSupervisor, exceptionType=class java.lang.RuntimeException, exceptionMessage=java.util.concurrent.ExecutionException: io.druid.indexing.kafka.KafkaIndexTaskClient$TaskNotRunnableException: Aborting request because task [index_kafka_analytics_sessions_75dc73c7d7c481e_ofbkggeb] is not runnable, noticeClass=RunNotice}


Very important this log:

2018-05-13T11:05:09,700 WARN [...] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Cannot allocate new segment for dataSource[analytics_sessions], interval[2018-05-12T00:00:00.000Z/2018-05-13T00:00:00.000Z], maxVersion[2018-05-13T11:05:09.649Z]: ShardSpec class[class io.druid.timeline.partition.NoneShardSpec] used by [analytics_sessions_2018-05-12T00:00:00.000Z_2018-05-13T00:00:00.000Z_2018-05-13T03:29:12.939Z].

It suggests that the problem seems to be the ability to handle outdated messages. In other words the ingestion tasks fail while trying to publish a segment  for messages that are out of the ingestion period. Correct me if I'm wrong but Druid should handle this gracefully creating a new segment for outdated messages. And, anyways, it did it just fine so far!

Temporary solution: to keep going with the ingestion and avoiding failures I've set lateMessageRejectionPeriod to 6 hours so messages older than 6 hours get ignored (I've set it to 24h but eventually I got a younger/outdated message that broke everything and eventually I ended up with 6 hours). This is not a valid solution for many reasons include data loss and of course, the fact that Druid should handle correctly and create a new segment.

Related problem (?): Every night we run a compaction task to compact the hundreds of segments Druid creates into a single shard. 80% of the times tasks succeed while a few times they fails as follow:

2018-05-10T03:05:24,085 INFO [appenderator_merge_0] io.druid.storage.google.GoogleDataSegmentPusher - Inserting [/tmp/descriptor5241563509765505967.json] to [segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/descriptor.json]
2018-05-10T03:05:24,256 INFO [appenderator_merge_0] io.druid.storage.google.GoogleDataSegmentPusher - Deleting file [/tmp/index8845782745579078455.zip]
2018-05-10T03:05:24,285 INFO [appenderator_merge_0] io.druid.storage.google.GoogleDataSegmentPusher - Deleting file [/tmp/descriptor5241563509765505967.json]
2018-05-10T03:05:24,304 INFO [appenderator_merge_0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Pushed merged index for segment[analytics_sessions_2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z_2018-05-10T03:00:08.229Z], descriptor is: DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}
2018-05-10T03:05:24,339 INFO [publish-0] io.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Dropping segments[[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}]]
2018-05-10T03:05:24,355 INFO [appenderator_persist_0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Removing commit metadata for segment[analytics_sessions_2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z_2018-05-10T03:00:08.229Z].
2018-05-10T03:05:24,356 INFO [appenderator_persist_0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Removing sink for segment[analytics_sessions_2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z_2018-05-10T03:00:08.229Z].
2018-05-10T03:05:24,377 INFO [appenderator_persist_0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Deleting Index File[var/druid/task/druid_daily_compaction_analytics_sessions_2018-05-09/work/persist/analytics_sessions_2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z_2018-05-10T03:00:08.229Z]
2018-05-10T03:05:24,578 INFO [task-runner-0-priority-0] io.druid.indexing.common.task.IndexTask - Pushed segments[[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}]]
2018-05-10T03:05:24,581 INFO [publish-0] io.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Publishing segments with commitMetadata[null]: [DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}]
2018-05-10T03:05:24,583 INFO [publish-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[druid_daily_compaction_analytics_sessions_2018-05-09]: SegmentInsertAction{segments=[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}], startMetadata=null, endMetadata=null}
2018-05-10T03:05:24,589 INFO [publish-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[druid_daily_compaction_analytics_sessions_2018-05-09] to overlord: [SegmentInsertAction{segments=[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}], startMetadata=null, endMetadata=null}].
2018-05-10T03:05:24,604 INFO [publish-0] io.druid.java.util.http.client.pool.ChannelResourceFactory - Generating: [...]
2018-05-10T03:05:24,635 INFO [publish-0] io.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Transaction failure while publishing segments, checking if someone else beat us to it.
2018-05-10T03:05:24,642 INFO [publish-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[druid_daily_compaction_analytics_sessions_2018-05-09]: SegmentListUsedAction{dataSource='analytics_sessions', intervals=[2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z]}
2018-05-10T03:05:24,647 INFO [publish-0] io.druid.indexing.common.actions.RemoteTaskActionClient - Submitting action for task[druid_daily_compaction_analytics_sessions_2018-05-09] to overlord: [SegmentListUsedAction{dataSource='analytics_sessions', intervals=[2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z]}].
2018-05-10T03:05:24,649 INFO [publish-0] io.druid.java.util.http.client.pool.ChannelResourceFactory - Generating: [...]
2018-05-10T03:05:24,798 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down...
2018-05-10T03:05:24,801 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[CompactionTask{id=druid_daily_compaction_analytics_sessions_2018-05-09, type=compact, dataSource=analytics_sessions}]
java
.lang.RuntimeException: java.util.concurrent.ExecutionException: io.druid.java.util.common.ISE: Failed to publish segments[[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}]]
 at com
.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
 at io
.druid.indexing.common.task.IndexTask.generateAndPublishSegments(IndexTask.java:739) ~[druid-indexing-service-0.12.0.jar:0.12.0]
 at io
.druid.indexing.common.task.IndexTask.run(IndexTask.java:264) ~[druid-indexing-service-0.12.0.jar:0.12.0]
 at io
.druid.indexing.common.task.CompactionTask.run(CompactionTask.java:209) ~[druid-indexing-service-0.12.0.jar:0.12.0]
 at io
.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:444) [druid-indexing-service-0.12.0.jar:0.12.0]
 at io
.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:416) [druid-indexing-service-0.12.0.jar:0.12.0]
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_152]
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_152]
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_152]
 at java
.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
Caused by: java.util.concurrent.ExecutionException: io.druid.java.util.common.ISE: Failed to publish segments[[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}]]
 at java
.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_152]
 at java
.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_152]
 at io
.druid.indexing.common.task.IndexTask.awaitPublish(IndexTask.java:763) ~[druid-indexing-service-0.12.0.jar:0.12.0]
 at io
.druid.indexing.common.task.IndexTask.generateAndPublishSegments(IndexTask.java:718) ~[druid-indexing-service-0.12.0.jar:0.12.0]
 
... 8 more
Caused by: io.druid.java.util.common.ISE: Failed to publish segments[[DataSegment{size=207310228, shardSpec=NoneShardSpec, metrics=[count, pageViewCount, totalTimeSpent, totalVisibleMillis], dimensions=[sessionId, pstuid, psuid, campaignId, entryUrl, entryQs, referrerUrl, deviceName, deviceType, utmSource, utmCampaign, utmMedium, utmContent, utmTerm, advertiserId, adgroupId, adcopyId, publisherId, templateId, positionId, websiteId, clickId], version='2018-05-10T03:00:08.229Z', loadSpec={type=>google, bucket=>pws-druid-prod, path=>segments/analytics_sessions/2018-05-09T00:00:00.000Z_2018-05-10T00:00:00.000Z/2018-05-10T03:00:08.229Z/0/index.zip}, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, dataSource='analytics_sessions', binaryVersion='9'}]]
 at io
.druid.segment.realtime.appenderator.BaseAppenderatorDriver.lambda$publishInBackground$5(BaseAppenderatorDriver.java:441) ~[druid-server-0.12.0.jar:0.12.0]
 
... 4 more
2018-05-10T03:05:24,824 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [druid_daily_compaction_analytics_sessions_2018-05-09] status changed to [FAILED].
2018-05-10T03:05:24,830 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
 
"id" : "druid_daily_compaction_analytics_sessions_2018-05-09",
 
"status" : "FAILED",
 
"duration" : 292232
}

...and coordinator/overlord:

2018-05-10T03:05:24,652 INFO [qtp1154895182-117] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[druid_daily_compaction_analytics_sessions_2018-05-09]: SegmentListUsedAction{dataSource='analytics_sessions', intervals=[2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z]}
2018-05-10T03:05:26,262 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[druid-middlemanager-01.c.pws-....internal:8091] wrote FAILED status for task [druid_daily_compaction_analytics_sessions_2018-05-09] on [TaskLocation{host='...', port=8107, tlsPort=-1}]
2018-05-10T03:05:26,262 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Worker[druid-middlemanager-01.c.....internal:8091] completed task[druid_daily_compaction_analytics_sessions_2018-05-09] with status[FAILED]
2018-05-10T03:05:26,262 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskQueue - Received FAILED status for task: druid_daily_compaction_analytics_sessions_2018-05-09
2018-05-10T03:05:26,262 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Cleaning up task[druid_daily_compaction_analytics_sessions_2018-05-09] on worker[druid-middlemanager-01.c.....internal:8091]
2018-05-10T03:05:26,264 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - Removing task[druid_daily_compaction_analytics_sessions_2018-05-09] from activeTasks
2018-05-10T03:05:26,264 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - Removing task[druid_daily_compaction_analytics_sessions_2018-05-09] from TaskLock[druid_daily_compaction_analytics_sessions_2018-05-09]
2018-05-10T03:05:26,264 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskLockbox - TaskLock is now empty: TaskLock{type=EXCLUSIVE, groupId=druid_daily_compaction_analytics_sessions_2018-05-09, dataSource=analytics_sessions, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, version=2018-05-10T03:00:08.229Z, priority=25, revoked=true}
2018-05-10T03:05:26,267 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.MetadataTaskStorage - Deleting TaskLock with id[20461]: TaskLock{type=EXCLUSIVE, groupId=druid_daily_compaction_analytics_sessions_2018-05-09, dataSource=analytics_sessions, interval=2018-05-09T00:00:00.000Z/2018-05-10T00:00:00.000Z, version=2018-05-10T03:00:08.229Z, priority=25, revoked=true}
2018-05-10T03:05:26,272 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.MetadataTaskStorage - Updating task druid_daily_compaction_analytics_sessions_2018-05-09 to status: TaskStatus{id=druid_daily_compaction_analytics_sessions_2018-05-09, status=FAILED, duration=318013}
2018-05-10T03:05:26,276 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskQueue - Task done: CompactionTask{id=druid_daily_compaction_analytics_sessions_2018-05-09, type=compact, dataSource=analytics_sessions}
2018-05-10T03:05:26,277 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskQueue - Task FAILED: CompactionTask{id=druid_daily_compaction_analytics_sessions_2018-05-09, type=compact, dataSource=analytics_sessions} (318013 run duration)
2018-05-10T03:05:26,277 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.TaskRunnerUtils - Task [druid_daily_compaction_analytics_sessions_2018-05-09] status changed to [FAILED].
2018-05-10T03:05:26,278 INFO [Curator-PathChildrenCache-1] io.druid.indexing.overlord.RemoteTaskRunner - Task[druid_daily_compaction_analytics_sessions_2018-05-09] went bye bye.


The new segment get created but Druid doesn't consider it. We got also a clean-up (kill) task that runs an hour later and deletes unused tasks (in particular the ones left by the compaction), but that does not have effects on the periods for which we re-tried the compaction (meaning that original segments, as well as the brand new segment do not get deleted). 

Ingestion spec:

{
   
"type":"kafka",
   
"dataSchema":{
     
"dataSource":"analytics_sessions",
     
"parser":{
         
"type":"avro_stream",
         
"avroBytesDecoder":{
           
"type":"schema_registry",
           
"url":"[...]"
         
},
         
"parseSpec":{
           
"format":"avro",
           
"timestampSpec":{
               
"column":"ts",
               
"format":"auto"
           
},
           
"flattenSpec":{
               
"fields":[
                 
{
                     
"type":"path",
                     
"name":"deviceName",
                     
"expr":"$.device..deviceName"
                 
},
                 
{
                     
"type":"path",
                     
"name":"deviceType",
                     
"expr":"$.device..deviceType"
                 
}
               
]
           
},
           
"dimensionsSpec":{
               
"dimensions":[...],
               
"dimensionExclusions":[]
           
}
         
}
     
},
     
"metricsSpec":[
         
{
           
"type":"count",
           
"name":"count"
         
},
         
{
           
"type":"doubleSum",
           
"name":"totalTimeSpent",
           
"fieldName":"totalTimeSpent"
         
},
         
{
           
"type":"doubleSum",
           
"name":"pageViewCount",
           
"fieldName":"pageViewCount"
         
},
         
{
           
"type":"doubleSum",
           
"name":"totalVisibleMillis",
           
"fieldName":"totalVisibleMillis"
         
}
     
],
     
"granularitySpec":{
         
"type":"uniform",
         
"segmentGranularity":"DAY",
         
"queryGranularity":"HOUR"
     
}
   
},
   
"tuningConfig":{
     
"type":"kafka",
     
"maxRowsInMemory":75000,
     
"maxRowsPerSegment":5000000,
     
"intermediatePersistPeriod":"PT10M",
     
"resetOffsetAutomatically":true
   
},
   
"ioConfig":{
     
"topic":"sessions",
     
"lateMessageRejectionPeriod":"PT6H",
     
"replicas":2,
     
"taskCount":3,
     
"taskDuration":"PT30M",
     
"consumerProperties":{
         
"bootstrap.servers":[...],
         
"group.id":[...],
         
"auto.offset.reset":"latest"
     
}
   
}
}

Please do not hesitate to ask if you need more info.
Thank you for your help guys.

Emanuele,

rod...@gmail.com

unread,
May 14, 2018, 12:34:09 AM5/14/18
to Druid User
You can try to set "forceExtendableShardSpecs" flag to "true" in your "compaction task"  - see http://druid.io/docs/latest/ingestion/tasks.html

Good luck,
D.

epi...@powerspace.com

unread,
May 14, 2018, 3:46:06 AM5/14/18
to Druid User
Hi D.

Thanks for your reply.

I thought about forcing the usage of a proper Shard Spec and I'm going to take this action right away. However would you mind to explain why this is actually necessary and possibly why the error didn't occur before/to the other data source?

Thank you again!

E.

epi...@powerspace.com

unread,
May 14, 2018, 6:11:00 AM5/14/18
to Druid User
Alright, this is actually my compaction spec:

{
 
"id": "druid_daily_compaction_2018-05-05",
 
"type" : "compact",
 
"dataSource" : "analytics_sessions",
 
"interval" : "2018-05-05/2018-05-06",
 
"tuningConfig": {
   
"type" : "index",
   
"numShards": 1,
   
"forceGuaranteedRollup": true
 
}
}

As far I understood having a number of shards equal to one plus using perfect roll-up via forceGuaranteedRollup (we used it to optimise segments size) makes Druid use NoneShardSpec which makes the addition of additional shards for the same granularity period forbidden. Indeed, we recently switches from numShards equals to 3 to numShards equals to 1.
I can't set forceExtendableShardSpec because it can't be used with forceGuaranteedRollup but I can revert back the number of shards to 3 so NoneShardSpec won't be used.

Can you please confirm that all this makes sense? I'll keep you guys posted so it could be useful for others.

Thank you,

E.

rod...@gmail.com

unread,
May 14, 2018, 6:44:08 AM5/14/18
to Druid User
We are using hadoop batch job for nightly reingest:

   "tuningConfig": {
      "type": "hadoop",
      "partitionsSpec" : {
        "type" : "hashed",
        "targetPartitionSize" : [variable]
    },

I think it make sense your explanation, but not sure about the benefits of using "numShards" over "targetPartitionSize" - I suppose you don't have a lot /fluctuant data.
Also, I never used "forceGuaranteedRollup" - is this worthing? Can you compare the rollup ratio with/without this flag?

Good luck!
D.

rod...@gmail.com

unread,
May 31, 2018, 7:46:21 AM5/31/18
to Druid User
Hi again - we also got this exception and I want to share our investigation results.

First, see https://groups.google.com/d/msg/druid-user/3XWDlsy4IGI/vC_tKrBwAQAJ for some nice explanations - i understand that you're using replica tasks, right?

We don't, but we encountered the exception during a middle manager service restart:
task 1
-> start from partition "2" : 4344349977
-> preparing for handoff "Pausing ingestion until resumed" + endOffsets changed to {2=4345544722}
SHUT DOWN
-> Shutting down...
-> !!! start kafka consuming again !!! "2" : 4344349977,
-> at least advance to the right offset Seeking partition[2] to offset[4,345,544,722].

So, the main problem here is the restart of the Kafka ingestion, while we were in the middle of segment hand off and finishing the task - meanwhile, a new peon was started :
task 2:
-> start from partition "2" : 4345544722,
and consuming the same records as the first task !


Both tasks finish somehow in a inconsistent way The task was asked to stop before completing  without changing the ending offset (a message like  endOffsets changed to...) which makes the following tasks to start from the initial one and falling with described error!

io.druid.java.util.common.ISE: Could not allocate segment for row with timestamp[2018-05-30T13:00:00.088Z]
io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Cannot use existing pending segment [...] for sequence[...] (previous = [...]) in DB, does not match requested interval[..]
We needed to restart/reset the supervisor in order to unlock the situation.
We are using Druid 0.11.0-iap7 and i saw quite of changes in 0.12 related to those classes  - it would be nice if someone can confirm this particular case was also covered






DruidUser

unread,
Feb 11, 2019, 4:50:40 AM2/11/19
to Druid User
We have exactly the same problem. Can you please share how you solved it? Or how you get the kafka ingestion stream back up and running again?

Op zondag 13 mei 2018 14:27:00 UTC+2 schreef epi...@powerspace.com:

Hong Wang

unread,
Mar 1, 2019, 9:56:30 AM3/1/19
to Druid User
Hi,

I also encountered very similar problem, KIS not able to allocate a specific segment interval, what can be done to resume ingestion? Please help. Here is the error I am getting:
{"0":89564638,"1":89979288,"2":87967619,"3":90826655,"4":89172114,"5":87968200,"6":45478,"7":107346809,"8":89164711,"9":88782660,"10":87999379,"11":89204083,"12":88762386,"13":90385867,"14":89179619,"15":87970104}}, IS_INCREMENTAL_HANDOFF_SUPPORTED=true}}]
io.druid.java.util.common.ISE: Could not allocate segment for row with timestamp[2019-02-28T22:10:32.388Z]
at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:642) ~[?:?]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:444) [druid-indexing-service-0.12.3.jar:0.12.3]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:416) [druid-indexing-service-0.12.3.jar:0.12.3]

Thanks
Hong 

Hong Wang

unread,
Mar 1, 2019, 3:07:52 PM3/1/19
to Druid User
I have to set lateMessageRejectionPeriod flag to bypass the problem message to get KIS process again.

Thanks

Laxmikant Pandhare

unread,
Jan 31, 2024, 10:41:36 PMJan 31
to Druid User
Setting lateMessageRejectionPeriod really worked. thank for suggestion.

My segments got in deadlock situation as my batch load and live load had different segment granularity.

John Kowtko

unread,
Feb 1, 2024, 7:07:43 AMFeb 1
to Druid User
Hi Laxmikan,  

The different segment granularity isn't as important here as the fact that the time interval was locked. 

There is a table in the docs that explains the lock priority: https://druid.apache.org/docs/latest/ingestion/tasks#lock-priority  Streaming ingestion takes priority over batch ingestion, so if a batch job is running and streaming ingestion needs to insert a record into that time interval, the lock for the batch job will be revoked and the batch job will abort.

Differing segment granularity is actually allowed, because the segmentGranularity in the Supervisor spec is actually a "target" granularity ... the streaming ingestion will adopt the granularity of any already-existing segments in that interval, and will move towards its target granularity when it can (i.e. when it opens up a newer, later interval that has no segments in it.)

Let us know if you have any questions on this.

Thanks.  John

Laxmikant Pandhare

unread,
Jun 26, 2024, 12:14:05 PMJun 26
to Druid User
Thank you for details explanation. If I put Secondary Partitioning, and that field will differentiate between batch and live streaming data then lock will avoided right?

In Secondary Partitioning case, it will create different segments for batch and live load as far as I know. Please correct me if I am wrong.

John Kowtko

unread,
Jun 27, 2024, 12:49:29 PMJun 27
to Druid User
Secondary partitioning does not have to do with the __time field ... and the locks are only releated to the primary (time) partitioning based on the __time field.  So I do not think secondary partitioning will matter either way in this case.
Reply all
Reply to author
Forward
0 new messages