Using Avro schema for primitive keys in Kafka Streams

1,728 views
Skip to first unread message

Randy Harmon

unread,
Dec 6, 2016, 5:27:53 PM12/6/16
to Confluent Platform
Hi all,

I'm extending the confluent examples for Kafka-Rest, Schema Registry and Kafka Streams to achieve a data flow that uses Avro format for topic-keys and topic-values. I'm having trouble with the KafkaAvroDeserializer.

Background:  My test stream has just key-field, (Long) id, and the Kafka Avro Serializer is registering a simple Avro schema for this.  The schema is added to the registry, and messages are arriving in Kafka with correct-looking Avro-encoded keys and values after they're produced.

Test: I'm producing a set of 10 simple messages, and using a subclass of org.apache.kafka.streams.kstream.ForeachAction to count those records in a very simple Streams instance using a SpecificAvroSerde[T] (see here), the Stream being configured with serdes like so:

p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, classOf[SpecificAvroSerde[_]])
p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[SpecificAvroSerde[_]])


When I try to run this stream in my test, it fails with org.apache.kafka.common.errors.SerializationException: int specified by the writers schema could not be instantiated to find the readers schema.

As I'm exploring the code, I find the following details:
  • in AbstractKafkaAvroDeserializer's getReaderSchema(), I find SpecificData.get().getClass(writerSchema) is returning "long" - that is, Long.TYPE, not a class.  
  • The getWrapper() method of SpecificData would return a class, but it's a private method.
  • I find getSchema(Object object) in AbstractKafkaAvroSerDe but it seems to be for translating in the other direction.
Recommendations?

R



Randy Harmon

unread,
Dec 6, 2016, 5:32:08 PM12/6/16
to Confluent Platform
Here's a stack trace I extracted for this scenario.

        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getReaderSchema(AbstractKafkaAvroDeserializer.java:218)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getDatumReader(AbstractKafkaAvroDeserializer.java:198)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:132)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:91)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
        at example.serde.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:58)
        at example.serde.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:28)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:39)

Randy Harmon

unread,
Dec 7, 2016, 11:05:25 AM12/7/16
to confluent...@googlegroups.com

I'm currently working around this by using an Avro schema of type "record", having exactly one field of type: "long".  Seems odd that I should have to do that, though.  

R

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ec7836f7-45e6-4854-966b-10741514ffde%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Damian Guy

unread,
Dec 7, 2016, 11:15:13 AM12/7/16
to confluent...@googlegroups.com
Hi Randy,

What was the schema you were using but wasn't working?

AFAIK all avro schemas need to have a record, i.e,?
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAGf1zuhkDUHMOXKEmrJCWdS-JroiiZhv8bZXnzas_RzmtY08Ag%40mail.gmail.com.

Randy Harmon

unread,
Dec 7, 2016, 12:00:18 PM12/7/16
to confluent...@googlegroups.com
Hi Damian,

My record itself was a nearly-trivial record along similar lines to your User example - for the value-type. Now, the key-type for that record was simply a primitive long (also tried int, hence that small discrepancy in my first message).  

So, kafka-avro-serializer was trying to decode the message posted to my topic, consisting of both the key and the value, but it wasn't getting past decoding of the key.  AFAICT, the sequence proceeded as follows:

Having extracted the schema-id (= 1) from the encoded key (00, 00 00 00 01, <some-avro-long-value>), and having a writerSchema (Avro-type-long) but no readerSchema, it sets readerSchema = avro's SpecificData.get().getClass(writerSchema) (which returns Long.type)

... but then readerClass.newInstance() throws an InstantiationException

Hope this clarifies the problem I had.

R


To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAJikTEWZn6wyq3A7i%2BLWqS%2BwOazWXjapAGTXMPbfn4jhk3V%2Bpw%40mail.gmail.com.

Damian Guy

unread,
Dec 7, 2016, 1:00:46 PM12/7/16
to confluent...@googlegroups.com
Any reason why you didn't just use Serde.Long() as the Key serializer?

To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAGf1zugiEhntEFsGsqyCGoqoiR7KWEm0A9gWSAzG98iESVhB0w%40mail.gmail.com.

Randy Harmon

unread,
Dec 7, 2016, 3:53:46 PM12/7/16
to confluent...@googlegroups.com
Great question, Damian.  When the primary key is simple, directly using a primitive serde instead of a degenerate Avro schema could make sense.  When that primary key is compound, I believe the structured serde is needed.  

Compound keys seem valuable for predictable partitioning, for example, when you want to store a 1:N relationship in a way where the N records use the same partitioning logic as their owning record.  This way a query engine like Apache Ignite's can join efficiently instead of having to do cross-node data joining (given that the node-affinity logic for Ignite is aligned with the partitioning-logic for Kafka).  That is in part because data affinity seems to always be key-based.

I suppose one could use Serde.Long() for the most common case and an Avro serde for special scenarios. 

R



To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAJikTEWUFj4rUaQXnPLxbeR7GGTnmsyxYM-ai3T9VWsbxKDhKQ%40mail.gmail.com.

Zhenkai Jiang

unread,
Feb 16, 2018, 7:28:24 PM2/16/18
to Confluent Platform
I run to the same issue today. While schema is not 100% defined by our side, does anyone know is there any chance we have a solution other than changing the schema for this now? Thanks.

To post to this group, send email to confluent...@googlegroups.com.
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/9_9tTK8-84s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages