Confluent 3.3.0 SpecificAvroSerde not able to deserialize the values

707 views
Skip to first unread message

ChanK

unread,
Dec 19, 2017, 10:56:39 PM12/19/17
to Confluent Platform
Hello all,

I am trying to use Kafka Streams to consume avro serialized data from a topic. I have configured my application as shown in WikipediaFeedAvroExample.java example. I have registered the schema in schema registry, which is :

{
   
"subject": "test.topic",
   
"version": 1,
   
"id": 1,
   
"schema": "{\"type\":\"record\",\"name\":\"avro_message\",\"fields\":[{\"name\":\"testMessage\",\"type\":[\"null\",\"string\"]}]}"
}


And I have created a "Message.avsc" schema file.
{
  "namespace": "com.example.avro",
  "type": "record",
  "name": "Message",
  "fields": [
    {
      "name": "testMessage",
      "type": "string"
    }
  ]
}

I have used avro-maven-plugin to generate java source for this schema. Which I used to create the specificAvroSerde.

Serde<String> stringSerde = Serdes.String();
Serde<Message> messageSerde = new SpecificAvroSerde();
KStream<String, Message> kStream = kStreamBuilder.stream(stringSerde, messageSerde, "test.topic");

I am able to connect to the topic, but it errors out throwing a Deserialization exception. No matter what message I consume, the value ends up being null. Can anyone help me understand what I am missing here.

Thanks.

Saïd Bouras

unread,
Dec 20, 2017, 7:56:49 AM12/20/17
to confluent...@googlegroups.com

Hi,

Can you post the snippet code ?
At first it seems to me that you have to give to the SpecificAvroSerde constructor the Schema Registry client (CachedSchemaRegistryClient ...).

Best regards


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/62ce3d7b-09cc-456b-9791-72dac195aa00%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--

Saïd BOURAS

Data Engineer

Mobile: 0662988731 

Zenika Paris
10 rue de Milan 75009 Paris
Standard : +33(0)1 45 26 19 15 - Fax : +33(0)1 72 70 45 10 
  

ChanK

unread,
Dec 20, 2017, 5:49:17 PM12/20/17
to Confluent Platform
I dont have access to my code base right now, but I gave the schema registry url to the properties just like shown in the above mentioned example.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Matthias J. Sax

unread,
Dec 21, 2017, 2:30:44 PM12/21/17
to confluent...@googlegroups.com
Can you share the stack trace of the error?

Do you integrate with Confluent Schema Registry?


-Matthias

On 12/19/17 7:56 PM, ChanK wrote:
> Hello all,
>
> I am trying to use Kafka Streams to consume avro serialized data from a
> topic. I have configured my application as shown
> in WikipediaFeedAvroExample.java
> <https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java>example.
> I have registered the schema in schema registry, which is :
>
> |
> {
>     "subject":"test.topic",
>     "version":1,
>     "id":1,
>    
> "schema":"{\"type\":\"record\",\"name\":\"avro_message\",\"fields\":[{\"name\":\"testMessage\",\"type\":[\"null\",\"string\"]}]}"
> }
>
> |
>
> And I have created a "Message.avsc" schema file.
> |
> |
> {
>   "namespace": "com.example.avro",
>   "type": "record",
>   "name": "Message",
>   "fields": [
>     {
>       "name": "testMessage",
>       "type": "string"
>     }
>   ]
> }
> |
>
> I have used avro-maven-plugin to generate java
> source for this schema. Which I used to create the specificAvroSerde.
>
> |
> Serde<String>stringSerde =Serdes.String();
> Serde<Message>messageSerde =newSpecificAvroSerde();
> KStream<String,Message>kStream
> =kStreamBuilder.stream(stringSerde,messageSerde,"test.topic");
> |
>
> I am able to connect to the topic, but it errors out throwing a
> Deserialization exception. No matter what message I consume, the value
> ends up being null. Can anyone help me understand what I am missing here.
>
> Thanks.
>
> |
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/62ce3d7b-09cc-456b-9791-72dac195aa00%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

ChanK

unread,
Dec 27, 2017, 11:48:43 AM12/27/17
to Confluent Platform
Hi, 

Please find the source code for this in here. Let me know if you were able to find the issue.

Thanks!
Chan


On Wednesday, December 20, 2017 at 6:56:49 AM UTC-6, Saïd Bouras wrote:
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Hyun Joon Seol

unread,
Dec 27, 2017, 3:02:14 PM12/27/17
to Confluent Platform

ChanK

unread,
Dec 27, 2017, 3:34:42 PM12/27/17
to Confluent Platform
https://github.com/chanakyamandava/kafka-streams-example/blob/009afdaa50c694f9a0651ef8dfc42a0b125bea78/src/main/java/kafka/streams/sample/consumer/AvroStreamListener.java#L50

Please check it now.. I even configured that... Any way I provided the schemaregistry url in the config properties.. so as per the reference you gave... it should be fine.

I ran the test again.. That didnt make any difference...

Please find the exception stack trace below.

2017-12-27 14:24:47.448  INFO 9292 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [myKafkaStreamsApp_avroStream-ff2b1b75-6707-425f-bc2d-f07d61f214c7-StreamThread-3] Shutting down
2017-12-27 14:24:47.452  INFO 9292 --- [-StreamThread-3] o.a.k.clients.producer.KafkaProducer     : Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2017-12-27 14:24:47.455  INFO 9292 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [myKafkaStreamsApp_avroStream-ff2b1b75-6707-425f-bc2d-f07d61f214c7-StreamThread-3] Removing all active tasks [0_0, 0_1, 0_2, 0_3]
2017-12-27 14:24:47.456  INFO 9292 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [myKafkaStreamsApp_avroStream-ff2b1b75-6707-425f-bc2d-f07d61f214c7-StreamThread-3] Removing all standby tasks []
2017-12-27 14:24:47.456  INFO 9292 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [myKafkaStreamsApp_avroStream-ff2b1b75-6707-425f-bc2d-f07d61f214c7-StreamThread-3] Stream thread shutdown complete
2017-12-27 14:24:47.456  WARN 9292 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [myKafkaStreamsApp_avroStream-ff2b1b75-6707-425f-bc2d-f07d61f214c7-StreamThread-3] Unexpected state transition from RUNNING to DEAD.
Exception in thread "myKafkaStreamsApp_avroStream-ff2b1b75-6707-425f-bc2d-f07d61f214c7-StreamThread-3" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=avro.topic, partition=2, offset=336
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:66)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:38)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

ChanK

unread,
Dec 27, 2017, 3:43:30 PM12/27/17
to Confluent Platform
This is the exception stack i have been running into.
> an email to confluent-platform+unsub...@googlegroups.com
> <mailto:confluent-platform+unsub...@googlegroups.com>.

Hyun Joon Seol

unread,
Dec 27, 2017, 3:58:49 PM12/27/17
to Confluent Platform
Looks like it reads up to a certain offset successfully. Are you sure that the topic contains only messages that adhere to the schema format? There may be a difference between the message in offsets 0-336 and after on.

ChanK

unread,
Dec 27, 2017, 4:06:33 PM12/27/17
to Confluent Platform
no, it doesnot... 336 is the starting offset of that partition.

ChanK

unread,
Jan 5, 2018, 4:46:06 PM1/5/18
to Confluent Platform
does the code help identifying the issue?
Reply all
Reply to author
Forward
0 new messages