Possible to use KafkaAvroDeserializer with Kafka Streams?

850 views
Skip to first unread message

Avi Flax

unread,
Feb 15, 2016, 4:21:01 PM2/15/16
to confluent...@googlegroups.com
I know Kafka Streams is still in development, but I was hoping to try
it out, and there’s a lot of green checkmarks in KAFKA-2590 so I
thought it might be worth a try.

Unfortunately, when my processor’s consumer tries to deserialize a
record using the KafkaAvroDeserializer, I’m getting this:

Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id 1
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120)

I’m using kafka-avro-serializer 2.0.0, so it looks like the errant line is:

Schema schema = schemaRegistry.getByID(id);

at

https://github.com/confluentinc/schema-registry/blob/2.x/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L120

so it looks as though `schemaRegistry` is null, which makes it seem as
though the `configure` method was never called. I did look through the
StreamThread and KafkaConsumer code though, so it looks as though it
*should* be called.

I know Kafka Streams is still in flux, and I certainly don’t expect
anyone from Confluent to support it at this point. But I’m curious,
and I thought I’d post here just in case someone feels like
enlightening me.

Thank you!
Avi

Avi Flax

unread,
Feb 25, 2016, 2:31:53 PM2/25/16
to Confluent Platform
On Monday, February 15, 2016 at 4:21:01 PM UTC-5, Avi Flax wrote:

so it looks as though `schemaRegistry` is null, which makes it seem as
though the `configure` method was never called.

I haven’t had a chance to dig into the root cause here, but I did find a not-awful workaround that I’m using for now, that I’ll share just in case someone else finds it helpful:
  • In my config for KafkaStreams, I’m specifying the ByteArrayDeserializer and ByteArraySerializer.
  • Before creating my topology, I’m “manually” constructing new instances of KafkaAvroDeserializer and KafkaAvroSerializer and then “manually” calling their `configure` methods.
  • In my topology, my first step after my “spout” (`from`) is a `ValueMapper` that accepts a record as a byte array, calls the `deserialize` method of the KafkaAvroDeserializer instance, and returns the result.
  • And just before the “sink” (`to`), I have a corresponding `ValueMapper` doing serialization from a GenericRecord into a bytearray by calling the `serialize` method of the KafkaAvroSerializer instance.
Here’s my (Ruby) code that’s creating everything:

# Factory functions to create instances of the Confluent Kafka Avro Deserializer
# and Serializer. The main reason we use them is for their automated integration
# with the Confluent Schema Registry.
# I’d prefer to “inject” these into Streams via config, but that’s not currently
# working for some reason. So for now this works.
module Serdes
 
def self.deserializing_mapper(schema_registry_url)
    deserializer
= KafkaAvroDeserializer.new
    config
= { 'schema.registry.url' => schema_registry_url }
    deserializer
.configure config, false
   
->(val) { deserializer.deserialize val }
 
end


 
def self.serializing_mapper(schema_registry_url, topic_name)
    serializer
= KafkaAvroSerializer.new
    config
= { 'schema.registry.url' => schema_registry_url }
    serializer
.configure config, false
   
->(val) { serializer.serialize topic_name, val }
 
end
end


and my topology:

schema_registry_url = streams_config['schema.registry.url']
deserializing_mapper
= Serdes.deserializing_mapper schema_registry_url
serializing_mapper
= Serdes.serializing_mapper schema_registry_url, to_topic
builder
= KStreamBuilder.new

builder
.stream(from_topic)
       
.mapValues(deserializing_mapper)
       
.mapValues(->(event) { process event })
       
.mapValues(serializing_mapper)
       
.to(to_topic)


HTH!
Reply all
Reply to author
Forward
0 new messages