Error deserializing key/value for partition - Error deserializing Avro message for id

984 views
Skip to first unread message

Muhammad

unread,
Sep 15, 2017, 8:16:43 PM9/15/17
to Confluent Platform

Hi 

 

I am using Akka-Stream-Kafka for consuming messages from Kafka in a Play framework application. Here are the details:

 

build.sbt

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" "0.17"

 

And here is the setting for consumer

val consumerSettings = ConsumerSettings(systemnew StringDeserializer, new KafkaAvroDeserializer())
  .withBootstrapServers(
"IP_ADDRESS:9092")
  .withProperty(ConsumerConfig.
KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
  .withProperty(ConsumerConfig.
VALUE_DESERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroDeserializer")
  .withProperty(KafkaAvroDeserializerConfig.
SPECIFIC_AVRO_READER_CONFIG"true")
  .withProperty(
"schema.registry.url""http://IP_ADDRESS:8081")
  
//.withProperty("auto.commit.enable", "false")
  
.withGroupId("index.consumers")
  .withProperty(
"enable.auto.commit""false")

 

While deserializing I get error. Here is the details:

 

[ERROR] [09/15/2017 19:50:13.610] [play-dev-mode-akka.kafka.default-dispatcher-12] [akka://play-dev-mode/system/kafka-consumer-1] Exception when polling from consumer

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tst2-0 at offset 3

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 141

Caused by: java.lang.NullPointerException

         at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)

         at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)

         at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)

         at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)

         at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)

         at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:918)

         at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)

         at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)

         at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:944)

         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:567)

         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528)

         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)

         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)

         at akka.kafka.KafkaConsumerActor.tryPoll$1(KafkaConsumerActor.scala:257)

         at akka.kafka.KafkaConsumerActor.poll(KafkaConsumerActor.scala:303)

         at akka.kafka.KafkaConsumerActor.akka$kafka$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:232)

         at akka.kafka.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:151)

         at akka.actor.Actor.aroundReceive(Actor.scala:514)

         at akka.actor.Actor.aroundReceive$(Actor.scala:512)

         at akka.kafka.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:73)

         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)

         at akka.actor.ActorCell.invoke(ActorCell.scala:496)

         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

         at akka.dispatch.Mailbox.run(Mailbox.scala:224)

         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

         at java.lang.Thread.run(Thread.java:748)

 

[error] p.c.s.c.WebSocketFlowHandler - WebSocket flow threw exception

java.lang.Exception: Consumer actor terminated

         at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1(SingleSourceLogic.scala:54)

         at akka.kafka.internal.SingleSourceLogic.$anonfun$preStart$1$adapted(SingleSourceLogic.scala:40)

         at akka.stream.stage.GraphStageLogic$StageActor.internalReceive(GraphStage.scala:185)

         at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1(GraphStage.scala:152)

         at akka.stream.stage.GraphStageLogic$StageActor.$anonfun$callback$1$adapted(GraphStage.scala:152)

         at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:443)

         at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:453)

         at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:546)

         at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:725)

         at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)

 

BTW, there are some warnings in the Console while debugging:

 

[info] play.api.Play - Application started (Dev)

[warn] o.a.k.c.c.ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.

[warn] o.a.k.c.c.ConsumerConfig - The configuration 'specific.avro.reader' was supplied but isn't a known config.

[warn] o.a.k.c.c.ConsumerConfig - The configuration 'auto.commit.enable' was supplied but isn't a known config.

 

Can someone please help?

 

Reply all
Reply to author
Forward
0 new messages