Tranquility throwing KafkaConsumer NullPointer Exception "randomly"

116 views
Skip to first unread message

kum...@gmail.com

unread,
Oct 20, 2016, 5:39:25 AM10/20/16
to Druid User
I have a problem driving me nuts. I'm reading from Kafka, but Tranquility throws an NullPointer exception after what seems like a random amount of time, usually between 15-45 minutes after starting. Up until it breaks, everything works just fine and data is pouring into Druid.
Once the error appears tranquility tries to restart but the same error keeps repeating over and over. If I leave it for a long time, maybe a couple of hours, it sometimes starts working again and then the process repeats after a while. When the error occurs I can sometimes get it to start up again by restarting Tranquility after either changing the name of the consumer group or clearing out all Kafka/Tranquility data from zookeeper.
 
Kafka and zookeeper logs gives me nothing to go on. No errors, nothing out of the ordinary. I'm out of ideas. 

This is what shows up in the Tranquility log; 

2016-10-20 09:00:13,678 [KafkaConsumer-CommitThread] INFO  c.m.tranquility.kafka.KafkaConsumer - Flushed {bodil-snmp-parser-out={receivedCount=1, sentCount=1, drop pedCount=0, unparseableCount=0}} pending messages in 9229ms and committed offsets in 55ms.
2016-10-20 09:00:32,725 [KafkaConsumer-0] ERROR c.m.tranquility.kafka.KafkaConsumer - Exception:
java.lang.NullPointerException: null
        at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) ~[na:1.7.0_91]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:181) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$7.apply(DruidBeams.scala:180) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:198) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.druid.DruidBeams$$anonfun$1$$anonfun$apply$1.apply(DruidBeams.scala:198) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2$$anonfun$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.twitter.util.Try$.apply(Try.scala:13) ~[com.twitter.util-core_2.11-6.30.0.jar:6.30.0]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:36) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.beam.TransformingBeam$$anonfun$sendAll$2.apply(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) ~[org.scala-lang.scala-library-2.11.7.jar:na]
        at com.metamx.tranquility.beam.TransformingBeam.sendAll(TransformingBeam.scala:35) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.tranquilizer.Tranquilizer.com$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:301) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$3.apply(Tranquilizer.scala:116) ~[io.druid.tranquility-core-0.8.1.jar:0.8.1]
        at com.metamx.common.scala.concurrent.package$$anon$1.run(package.scala:37) ~[com.metamx.scala-util_2.11-1.11.6.jar:1.11.6]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]
2016-10-20 09:00:32,725 [KafkaConsumer-0] INFO  c.m.tranquility.kafka.KafkaConsumer - Shutting down - attempting to flush buffers and commit final offsets
2016-10-20 09:00:32,726 [KafkaConsumer-0] INFO  k.c.ZookeeperConsumerConnector - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac], ZKConsumerConnector shutting down
2016-10-20 09:00:32,737 [KafkaConsumer-0] INFO  k.c.ZookeeperTopicEventWatcher - Shutting down topic event watcher.
2016-10-20 09:00:32,737 [KafkaConsumer-0] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1476953842862] Stopping leader finder thread
2016-10-20 09:00:32,737 [KafkaConsumer-0] INFO  k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread], Shutting down
2016-10-20 09:00:32,738 [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread] INFO  k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread], Stopped
2016-10-20 09:00:32,738 [KafkaConsumer-0] INFO  k.c.ConsumerFetcherManager$LeaderFinderThread - [tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-leader-finder-thread], Shutdown completed
2016-10-20 09:00:32,738 [KafkaConsumer-0] INFO  k.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1476953842862] Stopping all fetchers
2016-10-20 09:00:32,738 [KafkaConsumer-0] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6], Shutting down
2016-10-20 09:00:32,739 [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6], Stopped
2016-10-20 09:00:32,739 [KafkaConsumer-0] INFO  kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-tranquility-kafka-12_hgd1-netstat-4-1476953842769-7dce75ac-0-6], Shutdown completed


I'm fairly new to Kafka/Tranquility/Druid so I'm having a hard time figuring out what's happening here. Why does KafkaConsumer suddenly stop working? How come the problem persists once it appears? I can see in Zookeeper that the offset is reset quite a bit when this happens, is Tranquility trying to read something that isn't there anymore and that's why it keeps failing?
Any clues to what might be going on here? This is my first time asking for help in one of these groups so I apologize in advance if I haven't provided enough info, please just let me know in that case.
Any help is appreciated.

David Lim

unread,
Oct 20, 2016, 2:47:51 PM10/20/16
to Druid User
Hi,

That exception is being thrown because your Kafka topic has null messages in it. Is it possible to have your Kafka producers not generate null events?

If you're just starting ingesting from Kafka into Druid, I'd recommend taking a look at the Kafka indexing service to see if it fits your use case. It's still considered experimental but is getting more usage and so far looks reasonably stable. It doesn't require a separate tranquility-kafka process, provides an exactly-once ingestion guarantee, and can ingest historical data as well as recent data. These things should hopefully give you a better ingestion experience.

There's documentation here if you're interested: http://druid.io/docs/0.9.1.1/development/extensions-core/kafka-ingestion.html
Reply all
Reply to author
Forward
0 new messages