KafkaIndexTask Triggers Infinate Loop on Late Messages

1,005 views
Skip to first unread message

drew dahlke

unread,
Jun 29, 2016, 10:52:27 AM6/29/16
to Druid User
Hi, I've been playing with KafkaIndexTask. It seems to work quite well until I back populate data with the hadoop job. After that runs I get into an endless loop of failed tasks in the MM log. I suspect it's unhappy that a late kafka message came in for 2016-06-24 and that segment was replaced by the hadoop job.


MM Log:

2016-06-29T12:03:55.924+0000 i.d.s.r.a.FiniteAppenderatorDriver [WARN] Cannot allocate segment for timestamp[2016-06-24T14:37:41.563Z], sequenceName[index_kafka_imetrics_232cab31c25f4fd_0].  {}
2016-06-29T12:03:55.925+0000 i.d.s.r.a.FiniteAppenderatorDriver [INFO] Persisting data. {}
2016-06-29T12:03:55.930+0000 i.d.s.r.a.AppenderatorImpl [INFO] Submitting persist runnable for dataSource[imetrics] {}
2016-06-29T12:03:55.935+0000 i.d.s.r.a.AppenderatorImpl [INFO] Committing metadata[FiniteAppenderatorDriverMetadata{activeSegments={}, lastSegmentIds={}, callerMetadata={nextPartitions=KafkaPartitions{topic='RawInteractionMetrics', partitionOffsetMap={0=3286034, 2=22987917}}}}] for sinks[]. {}
2016-06-29T12:03:55.943+0000 i.d.s.r.a.FiniteAppenderatorDriver [INFO] Persisted pending data in 17ms. {}
2016-06-29T12:03:55.947+0000 i.d.s.r.a.AppenderatorImpl [INFO] Shutting down... {}
2016-06-29T12:03:55.950+0000 i.d.i.o.ThreadPoolTaskRunner [ERROR] Exception while running task[KafkaIndexTask{id=index_kafka_imetrics_232cab31c25f4fd_klbnmdgh, type=index_kafka, dataSource=imetrics}] {}
com.metamx.common.ISE: Could not allocate segment for row with timestamp[2016-06-24T14:37:41.563Z]
    at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:427) ~[?:?]
    at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1-rc5-SNAPSHOT.jar:0.9.1-rc5-SNAPSHOT]
    at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1-rc5-SNAPSHOT.jar:0.9.1-rc5-SNAPSHOT]
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_75]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_75]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_75]
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
2016-06-29T12:03:55.955+0000 i.d.i.o.TaskRunnerUtils [INFO] Task [index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] status changed to [FAILED]. {}
2016-06-29T12:03:55.958+0000 i.d.i.w.e.ExecutorLifecycle [INFO] Task completed with status: {
  "id" : "index_kafka_imetrics_232cab31c25f4fd_klbnmdgh",
  "status" : "FAILED",
  "duration" : 1629
} {}

When I check out the task log it's nothing more than GC & jersey/jackson info in there :( The task dies almost immediately. Note there is a segment generated by hadoop

mysql> select * from druid_segments where start = "2016-06-24T00:00:00.000Z"\G;

          id: imetrics_2016-06-24T00:00:00.000Z_2016-06-25T00:00:00.000Z_2016-06-29T01:29:18.462Z
  dataSource: imetrics
created_date: 2016-06-29T02:29:27.917Z   <---- When the hadoop job ran
       start: 2016-06-24T00:00:00.000Z
         end: 2016-06-25T00:00:00.000Z



Overlord logs

2016-06-29T12:03:55.873+0000 i.d.i.o.MetadataTaskStorage [INFO] Adding lock on interval[2016-06-24T00:00:00.000Z/2016-06-25T00:00:00.000Z] version[2016-06-29T12:03:55.783Z] for task: index_kafka_imetrics_232cab31c25f4fd_klbnmdgh {}
2016-06-29T12:03:56.477+0000 i.d.i.o.RemoteTaskRunner [INFO] Worker[172.18.11.194:8291] wrote FAILED status for task [index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] on [TaskLocation{host='172.18.11.194', port=8101}] {}
2016-06-29T12:03:56.477+0000 i.d.i.o.RemoteTaskRunner [INFO] Worker[172.18.11.194:8291] completed task[index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] with status[FAILED] {}
2016-06-29T12:03:56.477+0000 i.d.i.o.TaskQueue [INFO] Received FAILED status for task: index_kafka_imetrics_232cab31c25f4fd_klbnmdgh {}
2016-06-29T12:03:56.477+0000 i.d.i.o.RemoteTaskRunner [INFO] Cleaning up task[index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] on worker[172.18.11.194:8291] {}
2016-06-29T12:03:56.480+0000 i.d.i.o.TaskLockbox [INFO] Removing task[index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] from activeTasks {}
2016-06-29T12:03:56.480+0000 i.d.i.o.TaskLockbox [INFO] Removing task[index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] from TaskLock[index_kafka_imetrics] {}
2016-06-29T12:03:56.491+0000 i.d.i.o.MetadataTaskStorage [INFO] Updating task index_kafka_imetrics_232cab31c25f4fd_klbnmdgh to status: TaskStatus{id=index_kafka_imetrics_232cab31c25f4fd_klbnmdgh, status=FAILED, duration=7884} {}
2016-06-29T12:03:56.497+0000 i.d.i.o.TaskQueue [INFO] Task done: KafkaIndexTask{id=index_kafka_imetrics_232cab31c25f4fd_klbnmdgh, type=index_kafka, dataSource=imetrics} {}
2016-06-29T12:03:56.497+0000 i.d.i.o.TaskQueue [INFO] Task FAILED: KafkaIndexTask{id=index_kafka_imetrics_232cab31c25f4fd_klbnmdgh, type=index_kafka, dataSource=imetrics} (7884 run duration) {}
2016-06-29T12:03:56.497+0000 i.d.i.o.TaskRunnerUtils [INFO] Task [index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] status changed to [FAILED]. {}
2016-06-29T12:03:56.497+0000 i.d.i.o.RemoteTaskRunner [INFO] Task[index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] went bye bye. {}
2016-06-29T12:04:02.765+0000 i.d.i.k.s.KafkaSupervisor [WARN] Failed to get status for task [index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] {}
io.druid.indexing.kafka.KafkaIndexTaskClient$TaskNotRunnableException: Aborting request because task [index_kafka_imetrics_232cab31c25f4fd_klbnmdgh] is not runnable


Gian Merlino

unread,
Jun 29, 2016, 11:05:09 AM6/29/16
to druid...@googlegroups.com
Hey Drew,

Could you attach the supervisor & ingestion specs you're using for the Kafka and Hadoop jobs?

Gian

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/0ab14286-1bff-45cc-9b7b-80c760dfd0be%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

drew dahlke

unread,
Jun 29, 2016, 11:20:07 AM6/29/16
to Druid User
Sure, here they are
hadoop_specfile.txt
imetrics_spec.txt

Gian Merlino

unread,
Jun 29, 2016, 11:39:39 AM6/29/16
to druid...@googlegroups.com
Hey Drew,

For interoperating between hadoop/kafka the segmentGranularity needs to match. In your files one is HOUR and one is DAY. Could you try adjusting them such that they are the same? (Probably changing Hadoop to DAY is easiest)

Gian

Max Lord

unread,
Jun 29, 2016, 12:18:36 PM6/29/16
to Druid User
Could you guys clarify what the normal, expected behavior for late arrivals is in this kind of configuration? We're still on tranquility-based indexing.

Are the late events dropped, written into different segments, or somehow appended to the existing segments?

Regards,
Max

Gian Merlino

unread,
Jun 29, 2016, 12:49:02 PM6/29/16
to druid...@googlegroups.com
Later events are written into new segments that become part of an existing segment set. The new Kafka indexing service never drops messages by default.

For running both Hadoop indexing and new Kafka indexing on the same dataSource, one possibility is to *not* run Hadoop indexing at all! The new Kafka indexing service is designed to offer exactly once ingestion, and reprocessing is possible through a "kappa architecture" sort of setup. For many use cases this can make Hadoop indexing unnecessary.

If you do want to run Hadoop indexing too, there are some caveats in the current version of things.

1) Coexistence only works if the segment sets generated by Hadoop are of a compatible segmentGranularity and is using "numbered" or "linear" shardSpecs. Hadoop indexing is not currently guaranteed to always create those kinds of shardSpecs – it only does so for "hashed" partitionSpecs and only if you have more than one shard per time window.

2) Druid's interval based write locking prevents a Kafka indexing task from loading data into an interval that is currently being indexed by a Hadoop task. If some older data comes into Kafka, this can cause Kafka indexing to block waiting for the historical Hadoop task to finish. It will then append the message into the new segment set created by that Hadoop task. But, the blocking affects even new real-time ingestion.

For the time being, if you are running both Hadoop and the new Kafka indexing on the same dataSource, you can work around both of those by running more like a "classic" Druid hybrid batch/realtime pipeline using the "lateMessageRejectionPeriod" config (this behaves sort of like windowPeriod and can prevent the Kafka tasks from trying to write to segment sets generated by Hadoop).

We intend to make this all work more seamlessly in a future version.

Gian

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.

Max Lord

unread,
Jun 29, 2016, 1:26:12 PM6/29/16
to Druid User
Thanks for the clarification, Gian.

I agree with you that this approach has huge advantages, especially if you do not care to run Hadoop.

So this is accomplished with partitioning? For each segment granularity in which we receive any events, we can expect a new partition? 

I can imagine there is a performance trade-off if you receive many late events. For example, if I am indexing hourly, and I get stragglers for 2 days, I could potentially have 48 very small partitions for each hour, which would make queries do a lot of extra requests. I suppose one could re-ingest the partitions and merge them at a later time.

Sorry to hi-jack this thread :)

drew dahlke

unread,
Jun 29, 2016, 1:33:47 PM6/29/16
to Druid User
Excellent explanation Gian. I tried switching the kafkaIndexTask to granularity of Day and it was the same problem, but having incompatible shard spec explains it. We only do a 3 day log retention in kafka and we're kicking hadoop jobs off via the CLI. It sounds like setting lateMessageRejectionPeriod=1H and lagging the hadoop rebuilds by 3-4 days so it's not overwriting anything that could come over kafka would be a decent compromise?

FWIW, we're really happy to keep the hadoop job around. I know kappa will appeal to many folks, but we love the lambda :)

Gian Merlino

unread,
Jun 29, 2016, 3:18:34 PM6/29/16
to druid...@googlegroups.com
If you get stragglers over 2 days, then yeah, you will end up with a lot of small segments. That's actually one area where running together with batch indexing can make sense – you can use the batch indexing as a sort of "compaction" (using the "dataSource" inputSpec to read data from Druid segments rather than from the raw data). Making that mode of operation work better is one of the main reasons we want to make batch/new-Kafka indexing work more seamlessly together in the future.

Gian

Gian Merlino

unread,
Jun 29, 2016, 3:21:26 PM6/29/16
to druid...@googlegroups.com
Yes, that should work if you want to stick with a lambda style architecture. The lateMessageRejectionPeriod is meant to make that sort of architecture possible.

Gian

drew dahlke

unread,
Jun 29, 2016, 3:42:16 PM6/29/16
to Druid User
Thanks Gian!

Hao lv

unread,
Jul 11, 2016, 6:15:31 AM7/11/16
to Druid User
I have a similar problem, I have been using kafka indexing service in a fresh new cluster without hadoop job. But I have the same problem, all tasks failed because [com.metamx.common.ISE: Could not allocate segment for row with timestamp] when I have no segments at all. I have tried with or without lateMessageRejectionPeriod in the ingest json file. Here is some of my task logs.

2016-07-11T09:53:38,270 DEBUG [task-runner-0-priority-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] starting
2016-07-11T09:53:38,272 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,298 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,299 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,300 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,301 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,301 INFO [task-runner-0-priority-0] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://host
2016-07-11T09:53:38,398 DEBUG [HttpClient-Netty-Worker-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] messageReceived: DefaultHttpResponse(chunked: true)
HTTP/1.1 200 OK
Date: Mon, 11 Jul 2016 09:53:38 GMT
Content-Type: application/json
Vary: Accept-Encoding, User-Agent
Transfer-Encoding: chunked
Server: Jetty(9.2.5.v20141112)
2016-07-11T09:53:38,398 DEBUG [HttpClient-Netty-Worker-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] Got response: 200 OK
2016-07-11T09:53:38,400 DEBUG [HttpClient-Netty-Worker-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] messageReceived: org.jboss.netty.handler.codec.http.DefaultHttpChunk@6e3e91e4
2016-07-11T09:53:38,401 DEBUG [HttpClient-Netty-Worker-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] Got chunk: 15B, last=false
2016-07-11T09:53:38,401 DEBUG [HttpClient-Netty-Worker-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] messageReceived: org.jboss.netty.handler.codec.http.HttpChunk$1@71a7c8e9
2016-07-11T09:53:38,401 DEBUG [HttpClient-Netty-Worker-0] com.metamx.http.client.NettyHttpClient - [POST http://host/druid/indexer/v1/action] Got chunk: 0B, last=true
2016-07-11T09:53:38,406 WARN [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Cannot allocate segment for timestamp[2016-07-11T08:52:13.795Z], sequenceName[index_kafka_test_c634fdcd8e2d84d_3].
2016-07-11T09:53:38,406 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Persisting data.
2016-07-11T09:53:38,409 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Submitting persist runnable for dataSource[test]
2016-07-11T09:53:38,412 INFO [test-incremental-persist] io.druid.segment.realtime.appenderator.AppenderatorImpl - Committing metadata[FiniteAppenderatorDriverMetadata{activeSegments={}, lastSegmentIds={}, callerMetadata={nextPartitions=KafkaPartitions{topic='druid_metrics_final', partitionOffsetMap={0=305318, 1=624366, 2=512090, 3=2157602, 4=305308}}}}] for sinks[].
2016-07-11T09:53:38,419 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver - Persisted pending data in 13ms.
2016-07-11T09:53:38,420 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-consumer-1
2016-07-11T09:53:38,421 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-consumer-1
2016-07-11T09:53:38,421 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-consumer-1
2016-07-11T09:53:38,421 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-consumer-1
2016-07-11T09:53:38,421 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-consumer-1
2016-07-11T09:53:38,421 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-consumer-1
2016-07-11T09:53:38,421 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-consumer-1
2016-07-11T09:53:38,422 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--5.bytes-sent
2016-07-11T09:53:38,422 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--5.bytes-received
2016-07-11T09:53:38,422 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--5.latency
2016-07-11T09:53:38,422 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-sent
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.bytes-received
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-3.latency
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-sent
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-received
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.latency
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-4.bytes-sent
2016-07-11T09:53:38,423 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-4.bytes-received
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-4.latency
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-1.bytes-sent
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-1.bytes-received
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-1.latency
2016-07-11T09:53:38,424 DEBUG [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - The Kafka consumer has closed.
2016-07-11T09:53:38,425 INFO [task-runner-0-priority-0] io.druid.segment.realtime.appenderator.AppenderatorImpl - Shutting down...
2016-07-11T09:53:38,428 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_test_c634fdcd8e2d84d_gdooapdf, type=index_kafka, dataSource=test}]
com.metamx.common.ISE: Could not allocate segment for row with timestamp[2016-07-11T08:52:13.795Z]
        at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:427) ~[?:?]
        at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
        at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1.1.jar:0.9.1.1]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_73]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_73]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_73]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
2016-07-11T09:53:38,428 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_test_c634fdcd8e2d84d_gdooapdf] status changed to [FAILED].
2016-07-11T09:53:38,430 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_kafka_test_c634fdcd8e2d84d_gdooapdf",
  "status" : "FAILED",
  "duration" : 23633
}


And here is my spec.json

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "internal_druid_ingest_metrics",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [
            "host",
            "service",
            "dataSource"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "type": "doubleSum",
        "name": "ingest_events_thrownAway",
        "fieldName": "ingest/events/thrownAway"
      },
      {
        "type": "doubleSum",
        "name": "ingest_events_unparseable",
        "fieldName": "ingest/events/unparseable"
      },
      {
        "type": "doubleSum",
        "name": "ingest_events_processed",
        "fieldName": "ingest/events/processed"
      },
      {
        "type": "doubleSum",
        "name": "ingest_rows_output",
        "fieldName": "ingest/rows/output"
      },
      {
        "type": "doubleSum",
        "name": "ingest_persists_count",
        "fieldName": "ingest/persists/count"
      },
      {
        "type": "doubleSum",
        "name": "ingest_persists_time",
        "fieldName": "ingest/persists/time"
      },
      {
        "type": "doubleSum",
        "name": "ingest_persists_cpu",
        "fieldName": "ingest/persists/cpu"
      },
      {
        "type": "doubleSum",
        "name": "ingest_persists_backPressure",
        "fieldName": "ingest/persists/backPressure"
      },
      {
        "type": "doubleSum",
        "name": "ingest_persists_failed",
        "fieldName": "ingest/persists/failed"
      },
      {
        "type": "doubleSum",
        "name": "ingest_handoff_failed",
        "fieldName": "ingest/handoff/failed"
      },
      {
        "type": "doubleSum",
        "name": "ingest_merge_time",
        "fieldName": "ingest/merge/time"
      },
      {
        "type": "doubleSum",
        "name": "ingest_merge_cpu",
        "fieldName": "ingest/merge/cpu"
      },
      {
        "type": "doubleSum",
        "name": "ingest_handoff_count",
        "fieldName": "ingest/handoff/count"
      }
    ],
    "granularitySpec": {
      "segmentGranularity": "hour",
      "queryGranularity": "none",
      "type": "uniform"
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  },
  "ioConfig": {
    "topic": "druid_metrics_final",
    "consumerProperties": {
      "bootstrap.servers": "host1:9092,host2:9092"
    },
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
  }
}




在 2016年6月29日星期三 UTC+8下午11:39:39,Gian Merlino写道:

Gian Merlino

unread,
Jul 12, 2016, 12:25:51 PM7/12/16
to druid...@googlegroups.com
Hey Hao,

Do you see any errors or warnings in your Overlord logs? Could you also double-check that all of your loaded extensions match your Druid version?

Gian

Hao lv

unread,
Jul 13, 2016, 2:12:24 AM7/13/16
to Druid User
It is the problem of time zone. I have set all my time zone to Asia/Shanghai, and I can't allocate new segments.
But when I set mm and its peons' time zone to UTC, it works.
Is it a bug?

Thanks.
Hao

在 2016年7月13日星期三 UTC+8上午12:25:51,Gian Merlino写道:

Gian Merlino

unread,
Jul 13, 2016, 8:55:55 PM7/13/16
to druid...@googlegroups.com
Hey Hao,

Druid is only tested in UTC and there are a few things that are known to not work in other time zones. See https://github.com/druid-io/druid/issues/2619 for details. I would suggest running in UTC – you can do this by adding -Duser.timezone=UTC to the java command line. Please note that even if your servers run in UTC, you can use any time zone for queries, using ISO8601 offset notation for intervals and period granularities for your granularity.

Gian

eb...@choicestream.com

unread,
Mar 24, 2017, 3:08:44 PM3/24/17
to Druid User, david.cla...@gmail.com
Hi Gian - 

This post from David Lim says that "Druid segments don't all need to have the same granularity - i.e. you can have some that are HOUR, some DAY, some YEAR, etc. Druid will take all the available segments and put together a timeline of the most recently generated segments (latest version number) that contain data for a given time, regardless of the segment's granularity, and will use this to know how to answer queries."

Are you saying that this strategy fails to work when Kafka Indexing Service is running? (or more precisely, when Kafka Indexing Service might attempt to append data to a segment which has already been rolled up to courser segment granularity?). 

Is David's described strategy of rolling up still valid if the lateMessageRejectionPeriod is used to prevent conflicts between segments created by Kafka Indexing Service and rolled-up segments created by batch indexing?


On Wednesday, June 29, 2016 at 12:49:02 PM UTC-4, Gian Merlino wrote:
Reply all
Reply to author
Forward
0 new messages