Hi
I am using Akka-Stream-Kafka for consuming messages from Kafka in a Play framework application. Here are the details:
build.sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.17"
And here is the setting for consumer
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new KafkaAvroDeserializer())
.withBootstrapServers("IP_ADDRESS:9092")
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroDeserializer")
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
.withProperty("schema.registry.url", "http://IP_ADDRESS:8081")
//.withProperty("auto.commit.enable", "false")
.withGroupId("index.consumers")
.withProperty("enable.auto.commit", "false")
While deserializing I get error. Here is the details:
[ERROR] [09/15/2017 19:50:13.610] [play-dev-mode-akka.kafka.default-dispatcher-12] [akka://play-dev-mode/system/kafka-consumer-1] Exception when polling from consumer
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tst2-0 at offset 3
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 141
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:257)
at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:303)
at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:232)
at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:151)
at akka.actor.Actor.aroundReceive(Actor.scala:514)
at akka.actor.Actor.aroundReceive$(Actor.scala:512)
at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:73)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[error] p.c.s.c.WebSocketFlowHandler - WebSocket flow threw exception
java.lang.Exception: Consumer actor terminated
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)
at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)
at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:185)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:152)
at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:152)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:443)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:453)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:546)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:725)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)
BTW, there are some warnings in the Console while debugging:
[info] play.api.Play - Application started (Dev)
[warn] o.a.k.c.c.ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.
[warn] o.a.k.c.c.ConsumerConfig - The configuration 'specific.avro.reader' was supplied but isn't a known config.
[warn] o.a.k.c.c.ConsumerConfig - The configuration 'auto.commit.enable' was supplied but isn't a known config.
Can someone please help?