How to write a KafkaAvro Serde for GenericData.Record

1,570 views
Skip to first unread message

Giulio Vito de Musso

unread,
Apr 27, 2017, 9:04:26 AM4/27/17
to Confluent Platform
Hello,

I'm using Kafka 0.10.2 and Avro for the serialization of my messages, both for the key and for the value data.In particular I'm using the KafkaAvroSerializer class from the io.confluent.kafka-avro-serializer (v. 3.2.0) artifact from the Confluent Maven repository.

Now I would like to use Kafka Streams but I'm stuck trying to write the Serde class for the GenericData.Record class.

 
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
   
public final class KafkaAvroSerde implements Serde<Record> {
   
   
private final Serde<Record> inner;
   
   
public KafkaAvroSerde() {
           
// Here I get the error
    inner
= Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
   
}
   
   
public KafkaAvroSerde(SchemaRegistryClient client) {
   
this(client, Collections.emptyMap());
   
}
   
   
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
           
// Here I get the error
    inner
= Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
   
}
   
   
@Override
   
public Serializer<Record> serializer() {
   
return inner.serializer();
   
}
   
   
@Override
   
public Deserializer<Record> deserializer() {
   
return inner.deserializer();
   
}
   
   
@Override
   
public void configure(Map<String, ?> configs, boolean isKey) {
    inner
.serializer().configure(configs, isKey);
    inner
.deserializer().configure(configs, isKey);
   
}
   
   
@Override
   
public void close() {
    inner
.serializer().close();
    inner
.deserializer().close();
   
}
   
}


This is the error I'm getting at the commented lines

Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>


I need to define the Serde class for the GenericData.Record (and not for a specific POJO of mine) because I can have different object structures on the same channel, so the deserializer should return me the GenericData (and I will populate the right POJOs after this step).

How would you make things done? 
Thank you

Giulio Vito de Musso

unread,
Apr 27, 2017, 9:40:33 AM4/27/17
to Confluent Platform
Ok, I think I did. I followed this example


I used the GenericAvroSerde class which generated GenericRecord objects, which I can then work with.

I hope this will be useful for other people.

Michael Noll

unread,
Apr 27, 2017, 10:46:39 AM4/27/17
to confluent...@googlegroups.com
Giulio,

there's good news:  We just finished the work on an official Confluent Avro serde (specific Avro + generic Avro) for Kafka Streams.  See https://github.com/confluentinc/schema-registry/tree/master/avro-serde.

The new Avro serde, which is Confluent schema registry aware/compatible, will be released with upcoming Confluent 3.3, which is a few weeks out.

Until 3.3 is released, you can either build your own artifacts from the `master` branch (you must first build the `master` branches of [1] and [2] with `mvn install`, then the schema-registry project with `mvn install`), or e.g. copy-paste the classes into your own code project.

Hope this helps!
Michael





--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/471c81f7-ea0f-4e63-b72e-19665929db87%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Michael Noll

unread,
Apr 27, 2017, 10:50:00 AM4/27/17
to confluent...@googlegroups.com

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.
--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Giulio Vito de Musso

unread,
Apr 27, 2017, 10:57:38 AM4/27/17
to confluent...@googlegroups.com
Oh great! I will wait for the next Confluent release then :) 

Regards

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/BAiPi2KuTHs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Tianxiang Xiong

unread,
May 10, 2017, 12:37:40 PM5/10/17
to Confluent Platform
I'm not sure what the difference is b/t the new Kafka Avro serde classes and the existing KafkaAvroSerializer etc. According to Eno it's some code refactoring? Can you explain more about that?
To post to this group, send email to confluent...@googlegroups.com.

Ewen Cheslack-Postava

unread,
May 13, 2017, 4:56:43 PM5/13/17
to Confluent Platform
Kafka Streams defines a Serde class that's effectively just a wrapper to combine the Serializer and Deserializer in one class/object since for, e.g., through topics it requires both: http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/common/serialization/Serde.html. It just allows specifying the combination more concisely in the Kafka Streams APIs.


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages