Hi all!
I'm getting a ParseException while trying to ingest Kakfa events that look like:
{"timestamp":"2018-03-29T23:31:26.077Z","idRequest":3753192267374775741,"codAggregator":"DM","result":"OK","stackTrace":"LVS-5345"}The exceptions clearly shows that the timestamp field has not been sent to tranquility kakfa, but why? the timestamp is well json formatted as it has been successfully processed by kafka.
I'm using Druid 0.12.0 and Tranquility 0.8.0, the exception is:
com.metamx.common.parsers.ParseException: Unparseable timestamp found!
at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:72)
at io.druid.data.input.impl.StringInputRowParser.parseMap(StringInputRowParser.java:136)
at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:74)
at io.druid.data.input.impl.StringInputRowParser.parse(StringInputRowParser.java:37)
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:177)
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:177)
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:195)
at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:195)
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(TransformingBeam.scala:36)
at com.twitter.util.Try$.apply(Try.scala:13)
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:36)
at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:35)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35)
at
com.metamx.tranquility.tranquilizer.Tranquilizer.com$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:301)
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202)
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:202)
at scala.Option.foreach(Option.scala:257)
at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:202)
at com.metamx.tranquility.kafka.writer.TranquilityEventWriter.send(TranquilityEventWriter.java:76)
at com.metamx.tranquility.kafka.KafkaConsumer$2.run(KafkaConsumer.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException:
Null timestamp in input: {idRequest=8643556674707087331, codAggregator=DM, result=KO, stackTrace=Dummy...
at io.druid.data.input.impl.MapInputRowParser.parse(MapInputRowParser.java:63)
... 30 more
and the .json configuration file looks like:
{
"dataSources" : {
"aggregator-kafka" : {
"spec" : {
"dataSchema" : {
"dataSource" : "aggregator-kafka",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {"format" : "auto", "column" : "timestamp"},
"dimensionsSpec" : {
"dimensions" : ["timestamp","codAggregator", "result", "stackTrace"],
"dimensionExclusions" : [
"idRequest"
]
},
"format" : "json"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "minute",
"queryGranularity" : "none"
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
}
]
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "aggregator-out"
}
}
},
"properties" : {
"zookeeper.connect" : "localhost",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "localhost",
"kafka.group.id" : "tranquility-kafka"
}
}
Thanks in advance!
E