Tranquility throwing away all kafka messages

1,546 views
Skip to first unread message

Pere

unread,
Apr 27, 2016, 9:49:20 PM4/27/16
to Druid User
I have recently begun switching over from realtime nodes to tranquility as outlined in the docs. I currently am running stable druid 0.9.0 and tranq 0.7.4. I converted my spec file over to tranquility format and started tranquility. It recognizes the topic correctly and begins to consume. However it does not publish any of the messages to the workers. I am not sure how/why these are getting thrown away, I tried setting log level to debug but didnt see anything obvious. I get tons of messages like these:


2016-04-28 01:44:14,306 [ClusteredBeam-ZkFuturePool-3466a839-2d9b-4743-847c-2b5819b5720c] INFO  c.m.tranquility.beam.ClusteredBeam - Turns out we decided not to actually make beams for identifier[druid:prod:overlord/events] timestamp[2016-04-28T01:00:00.000Z]. Returning None.


2016-04-28 01:44:26,342 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {events={receivedCount=191176, sentCount=0, failedCount=191176}} pending messages in 2ms and committed offsets in 10ms.
...
(tons more of these)



My server.json looks like so:
{
   "dataSources" : [
      {
         "spec" : {
            "dataSchema" : {
               "parser" : {
                  "type" : "string",
                  "parseSpec" : {
                     "timestampSpec" : {
                      "column": "ts",
                      "format" : "auto"
                    },
                     "dimensionsSpec" : {
                        "spatialDimensions" : [
                           {
                              "dims" : [
                                 "lat",
                                 "lon"
                              ],
                              "dimName" : "geo"
                           }
                        ],
                        "dimensions" : [],
                        "dimensionExclusions" : ["location"]
                     },
                     "format" : "json"
                  }
               },
               "dataSource" : "events",
               "granularitySpec" : {
                  "type" : "uniform",
                  "segmentGranularity" : "HOUR",
                  "queryGranularity" : "NONE"
               },
               "metricsSpec" : [{
                  "type":"hyperUnique",
                  "name":"unique_users",
                  "fieldName":"uid"
                },
                {
                  "type":"count",
                  "name":"count"
                }
               ]
            },
            "tuningConfig" : {
                "type" : "realtime",
                "maxRowsInMemory": 500000,
                "intermediatePersistPeriod": "PT10m",
                "windowPeriod": "PT10m",
                "rejectionPolicy": {
                  "type": "serverTime"
                }
            }
         },
         "properties" : {
            "topicPattern.priority" : "1",
            "topicPattern" : "events"
         }
      }
   ],
   "properties" : {
       "zookeeper.connect" : "zk:2181",
       "zookeeper.timeout" : "PT20S",
       "druid.selectors.indexing.serviceName" : "druid/prod/overlord",
       "druid.discovery.curator.path" : "/prod/discovery",
       "kafka.zookeeper.connect" : "zk:2181/kafka-analytics",
       "kafka.group.id" : "tranquility-kafka",
       "consumer.numThreads" : "4",
       "commit.periodMillis" : "15000",
       "reportDropsAsExceptions" : "false"
    }
}


David Lim

unread,
Apr 28, 2016, 6:14:49 PM4/28/16
to Druid User
Hey Pere,

You can try enabling reportDropsAsExceptions and see if that provides any more insights. Are your events current? The main reasons we see events being dropped by Tranquility are old events (outside of the windowPeriod) or unparsable data.

Pere

unread,
May 27, 2016, 2:54:20 PM5/27/16
to Druid User
Thanks David,
I am still seeing the same messages as before on the tranquility node i switched to "true"

Pere

unread,
May 27, 2016, 4:59:22 PM5/27/16
to Druid User
David, 

I was not setting it in the right config. Here is the error:








2016-05-27 20:57:33,109 [KafkaConsumer-3] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:


java.lang.RuntimeException: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.maybeThrow(TranquilityEventWriter.java:166) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:135) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60]


        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60]


        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]


Caused by: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]


2016-05-27 20:57:33,110 [KafkaConsumer-3] INFO  c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets


2016-05-27 20:57:33,128 [ClusteredBeam-ZkFuturePool-8f9b0c1f-ffc5-41a8-b26b-10f40500e2f6] INFO  c.m.tranquility.beam.ClusteredBeam - Global latestCloseTime[2016-05-27T20:00:00.000Z] for identifier[druid:prod:overlord/weaver_events] has moved past timestamp[2016-05-27T20:00:00.000Z], not creating merged beam


2016-05-27 20:57:33,130 [ClusteredBeam-ZkFuturePool-8f9b0c1f-ffc5-41a8-b26b-10f40500e2f6] INFO  c.m.tranquility.beam.ClusteredBeam - Turns out we decided not to actually make beams for identifier[druid:prod:overlord/weaver_events] timestamp[2016-05-27T20:00:00.000Z]. Returning None.


2016-05-27 20:57:33,138 [KafkaConsumer-1] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:


java.lang.RuntimeException: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.maybeThrow(TranquilityEventWriter.java:166) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:135) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60]


        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60]


        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]


Caused by: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]


2016-05-27 20:57:33,138 [KafkaConsumer-2] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:


java.lang.RuntimeException: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.maybeThrow(TranquilityEventWriter.java:166) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:135) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60]


        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60]


        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]


Caused by: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]


2016-05-27 20:57:33,138 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:


java.lang.RuntimeException: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-16.0.1.jar:na]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.maybeThrow(TranquilityEventWriter.java:166) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:135) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231) ~[io.druid.tranquility-kafka-0.7.4.jar:0.7.4]


        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_60]


        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_60]


        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_60]


        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]


Caused by: com.metamx.tranquility.tranquilizer.MessageDroppedException: Message dropped


        at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]




On Thursday, April 28, 2016 at 3:14:49 PM UTC-7, David Lim wrote:

David Lim

unread,
May 31, 2016, 3:28:02 PM5/31/16
to Druid User
Hey Pere,

I'm not entirely sure what's going on, but from the log messages, I suspect you might have some data from failed tasks hanging around in ZK and in the metadata store that is preventing Tranquility from creating beams to handle the new messages. Can you try clearing this data and see if that helps? Remove any related entries from the Druid task metadata table (default name is druid_tasks) and delete the clusteredBeamZkBasePath from Zookeeper (default path is /tranquility/beams).

Pere

unread,
May 31, 2016, 5:23:20 PM5/31/16
to Druid User
David I think this may have been it. As It just seemed to fix itself in a couple segment periods. It seems that the indexing service/tranquility does not like failures in the middle manager.

Also another thing I noticed, if tasks get backed up and say I have 2 partitions and 2 replicas but only 3 slots then the data is lost for that segment. It throws away a bunch of messages and the numbers are not 0 but like 99% lower than they should be. It was my understanding the replica job was for HA failures. I can not seem to find any decisive documentation on this

Thanks.
Pere

David Lim

unread,
May 31, 2016, 5:49:54 PM5/31/16
to Druid User
Glad to hear that things are starting to come together.

Hmm, I think the overview page (https://github.com/druid-io/tranquility/blob/master/docs/overview.md) has the most relevant documentation. There's a short blurb on there talking about capacity planning, but basically, it's very important that your workers have enough capacity for the number of tasks that will be created by Tranquility for things to run smoothly. Replication is indeed for HA, and the indexing service will ensure that replica tasks are run on different nodes to protect against failures. However, having only one of the replicas start or having one start earlier than another because of insufficient capacity can lead to incorrect results, since the assumption is that both indexing tasks will receive the same data and will generate equivalent segments, which would be not true if some data went to only one of the tasks and not the other because it started late. As stated on the overview page, you should plan to have capacity for 2 * #partitions * #replicants to have enough slots for the transition between tasks on the time interval boundaries. Also, since replica tasks must run on separate nodes, to use replication you'll need at least [#replicants] middleManager nodes. Hope this helps!

Pere

unread,
Jun 2, 2016, 5:02:55 PM6/2/16
to Druid User
David you are the best thanks so much for the great explanation.
Reply all
Reply to author
Forward
0 new messages