KafkaAvroEncoder with Kafka-stream

1,878 views
Skip to first unread message

Tariq Mohammad

unread,
Apr 17, 2016, 9:13:14 PM4/17/16
to Confluent Platform
I am trying to read avro data from a kafka topic using KStream, but unable to do so because of the following exception :

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:329)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class io.confluent.kafka.serializers.KafkaAvroEncoder Does it have a public no-argument constructor?
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:317)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:188)
at org.apache.kafka.streams.StreamsConfig.valueSerializer(StreamsConfig.java:313)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:585)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:612)
at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:125)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:194)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:325)
... 1 more
Caused by: java.lang.InstantiationException: io.confluent.kafka.serializers.KafkaAvroEncoder
at java.lang.Class.newInstance(Class.java:364)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:313)
... 15 more

These are my configs :

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.JOB_ID_CONFIG, "test-app");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroEncoder.class);
streamsConfiguration.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
streamsConfiguration.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDecoder.class);
streamsConfiguration.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

Could someone please point me in the right direction. Many thanks!

Liquan Pei

unread,
Apr 17, 2016, 11:59:57 PM4/17/16
to confluent...@googlegroups.com
Hi Tariq,

KStream uses the new producer which requires to use KafkaAvroSerializer and KafkaAvroDeserializer. The KafkaAvroEncoder/Decoder should be used by the old scala producer. 

Thanks,
Liquan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1af55d40-bee5-4083-8391-75385f90ead1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Liquan Pei | Software Engineer | Confluent | +1 413.230.6855
Download Apache Kafka and Confluent Platform: www.confluent.io/download

Roel R

unread,
Apr 18, 2016, 2:28:17 AM4/18/16
to Confluent Platform
Tariq,

I was struggling with the same. When I got it working I've put up a few examples; maybe that helps: https://github.com/rollulus/kafka-streams-playground 
It is Scala.

Regards,
Roel

Mohammad Tariq

unread,
Apr 18, 2016, 5:21:27 AM4/18/16
to confluent...@googlegroups.com
Hi Liquanl,

Thank you for the quick response.

I had actually started with KafkaAvroSerializer and KafkaAvroDeserializer. But with that I was getting the below shown exception everytime I tried to run my program.

Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 3
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:104)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:53)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)

@Roel, Thanks you so much for sharing the repo link. This approach works really well. However, I would like to do it without having to write a custom serializer as I already have the schema of my Avro objects registered  with the Schema registry.

Any pointers/lead would be really helpful!

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/JLZWG1Ybpjc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Roel R

unread,
Apr 18, 2016, 7:24:53 AM4/18/16
to Confluent Platform
Tariq,

I recognize that particular stack trace, I guess that it is related to my observations I described in "AVRO in Kafka Streams and the WikiFeed example" at March 29 in this group (which is unfortunately not answered by Confluent yet). To get things working, I had to fix multiple things and I cannot recall which one exactly corresponds to your stack trace.

Are you using the StreamsConfig.KEY/VALUE_(DE)SERIALIZER_CLASS_CONFIG properties to provide the class type or do you give KStream.to(...) explicitly instances of your serializers? Maybe you can try the latter, by explicitly instantiating AND configuring them yourself.

Do you serialize into a Specific or a Generic Record? If you are using the Specific, you'll need at KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> "true", but note that this won't help for now, it is a problem you would have otherwise stumbled upon later ;)

- Roel

Mohammad Tariq

unread,
Apr 18, 2016, 3:35:34 PM4/18/16
to confluent...@googlegroups.com
Hi Roel,

Thank you for sharing your learnings. Yes, I am using StreamsConfig.KEY/VALUE_(DE)SERIALIZER_CLASS_CONFIG properties. And I'm serializing it into a GenericRecord. Completely clueless right now though. Just trying to figure out how to make it work.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/JLZWG1Ybpjc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Mohammad Tariq

unread,
Apr 18, 2016, 3:41:18 PM4/18/16
to confluent...@googlegroups.com
Strangely KafkaAvroDecoder works perfectly fine with Spark Streaming APIs. I think it's better to go with that at the moment. What would you suggest based on your experience with Kafka Streams?

Thanks!

Roel R

unread,
Apr 19, 2016, 2:06:10 AM4/19/16
to Confluent Platform
I think that using those StreamsConfig.KEY/VALUE_(DE)SERIALIZER_CLASS_CONFIG explains it: your deserializer (AbstractKafkaAvroDeserializer) is instantiated by the framework. However, it is won't get configured since none of the classes implements the Configurable interface (see my earlier post I referred to). That explains your NullPointerException at line 120: this.schemaRegistry is null as it is never set https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L120

After we got Kafka Streams working, we are happy with it. I would suggest to stick with Kafka Streams for certain applications. That's exactly what we do. Remember that it is a tech preview!

gerard...@dizzit.com

unread,
Apr 19, 2016, 3:13:45 AM4/19/16
to Confluent Platform
So it would be fixable by extending the Deserializer and setting the correct properties in a different way? Working with Camel I had to take the same route since it doesn't support the confluent properties for the schema registry. 

Mohammad Tariq

unread,
Apr 27, 2016, 8:01:25 PM4/27/16
to confluent...@googlegroups.com
Hi Roel,

I am really sorry for leaving the discussion abruptly. Have been traveling like crazy for last couple of weeks because of some personal urgency.

Today while looking at the code I noticed that AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe which in turn takes care of initializing SchemaRegistry. See this :


Then why do I need to handle this explicitly?

I am sorry to be a pest of questions, just trying to understand this thing properly. Please correct me if I am wrong.

Thanks again!

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/JLZWG1Ybpjc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Liquan Pei

unread,
Apr 28, 2016, 6:16:42 AM4/28/16
to confluent...@googlegroups.com
The method that initialize the SchemaRegistryClient is not called when the stream is created. This is because of KafkaAvroSerializer does not implement the Configurable interface and the configure method will not be called when initializing the serializer. Thus the SchemaRegistryClient instance is null and null pointer exception is thrown. 

- Liquan  

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Guozhang Wang

unread,
May 1, 2016, 11:29:46 PM5/1/16
to Confluent Platform
Liquan and I has synced on this issue, the current plan is to fix this issue in the example code explicitly, and we will re-think about the mechanism to configure default serde classes in Kafka Streams in general.


Guozhang


On Thursday, April 28, 2016 at 3:16:42 AM UTC-7, Liquan Pei wrote:
The method that initialize the SchemaRegistryClient is not called when the stream is created. This is because of KafkaAvroSerializer does not implement the Configurable interface and the configure method will not be called when initializing the serializer. Thus the SchemaRegistryClient instance is null and null pointer exception is thrown. 

- Liquan  
On Wed, Apr 27, 2016 at 5:00 PM, Mohammad Tariq <dont...@gmail.com> wrote:
Hi Roel,

I am really sorry for leaving the discussion abruptly. Have been traveling like crazy for last couple of weeks because of some personal urgency.

Today while looking at the code I noticed that AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe which in turn takes care of initializing SchemaRegistry. See this :


Then why do I need to handle this explicitly?

I am sorry to be a pest of questions, just trying to understand this thing properly. Please correct me if I am wrong.

Thanks again!
On Tue, Apr 19, 2016 at 12:43 PM, <gerard...@dizzit.com> wrote:
So it would be fixable by extending the Deserializer and setting the correct properties in a different way? Working with Camel I had to take the same route since it doesn't support the confluent properties for the schema registry. 

On Tuesday, April 19, 2016 at 8:06:10 AM UTC+2, Roel R wrote:
I think that using those StreamsConfig.KEY/VALUE_(DE)SERIALIZER_CLASS_CONFIG explains it: your deserializer (AbstractKafkaAvroDeserializer) is instantiated by the framework. However, it is won't get configured since none of the classes implements the Configurable interface (see my earlier post I referred to). That explains your NullPointerException at line 120: this.schemaRegistry is null as it is never set https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L120

After we got Kafka Streams working, we are happy with it. I would suggest to stick with Kafka Streams for certain applications. That's exactly what we do. Remember that it is a tech preview!

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/JLZWG1Ybpjc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
May 6, 2016, 5:22:53 AM5/6/16
to confluent...@googlegroups.com
Hi all,

thanks for your patience w.r.t. Avro-related issues in Kafka Streams.

There's good news:
- KAFKA-3639 [1] fixed the main gap that caused the Avro serialization/deserialization not to work as expected.  This fix is available in the latest `trunk` and `0.10.0` branches of Apache Kafka.  Kafka's `0.10.0` branch hosts the code that will go into the imminent Kafka 0.10.0.0 release (RC voting has already started).
- We have been tracking the upstream Kafka/Kafka Streams changes in 0.10.0 in our confluentinc/examples repository, feature branch `kafka-0.10.0.0` [2].
- This feature branch [2] has updated Avro examples (the ones you looked at before) as well as two new end-to-end demo applications, implemented as integration tests that spawn embedded Kafka/ZooKeeper/Confluent Schema registry instances.  Take a look at the updated examples to get you started.

Generic Avro end-to-end demo:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java

Specific Avro end-to-end demo:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

Note: The code in [2] is built against upcoming Kafka 0.10.0.0 and thus is not compatible with the Kafka Streams implementation in the Tech Preview (= what you have been using so far).  Until Kafka 0.10.0.0 is released officially in the next 1-2 weeks, you must build it from source and install it to your local Maven repository:

    $ git clone g...@github.com:apache/kafka.git && cd kafka
    $ git checkout 0.10.0
    # Perhaps you need to bootstrap `gradlew` first, see Kafka's top-level README:
    #     $ gradle
    $ ./gradlew clean installAll

Hope this helps!  Again, thanks for your patience as well as your bug reports.

Best,
Michael



[1] https://issues.apache.org/jira/browse/KAFKA-3639
[2] https://github.com/confluentinc/examples/tree/kafka-0.10.0.0 (this branch will continue to be updated until Kafka 0.10.0.0 is released, then it will be merged into `master` of confluentinc/examples)



To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Liquan Pei | Software Engineer | Confluent | +1 413.230.6855
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860

Jason Jho

unread,
May 10, 2016, 7:39:22 PM5/10/16
to Confluent Platform
Are there any known compatibility issues using Kafka 0.10.0 branch alongside Confluent's 2.1.0.alpha1 services, in particular, schema registry?
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Liquan Pei | Software Engineer | Confluent | +1 413.230.6855
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
May 12, 2016, 3:09:07 AM5/12/16
to confluent...@googlegroups.com
Jason,

Kafka Streams in Kafka trunk / 0.10.x does not work directly with 0.9.x clusters (and thus with CP 2.1.0-alpha1 aka the Kafka Streams Tech Preview) due to the change in message format, and some code changes are needed to work around it.  The upcoming next Confluent Platform release will make sure all the pieces work together, including Kafka Streams.

-Michael



To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Liquan Pei | Software Engineer | Confluent | +1 413.230.6855
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages