Kafka Connect + Avro + Schema Registry

930 views
Skip to first unread message

Christos Vasilakis

unread,
Jun 5, 2017, 3:52:11 PM6/5/17
to Confluent Platform
Hi there,

I have a source system that already produces Avro encoded messages (with a schema already defined in an *.avsc file). I am developing a Kafka Connect Source connector that connects to this system, consumes the Avro Messages and stores them to a Kafka Topic for later consumption by other systems. Currently when receiving the binary payload I am doing something like the following:

---
// serialize binary payload to Kafka-Connect API
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(MyAvroObj.getClassSchema());
GenericRecord avroRecord = reader.read(null, DecoderFactory.get().binaryDecoder(payload, null));
io.confluent.connect.avro.AvroData avroData = new AvroData();
SchemaAndValue schemaAndValue = avroData.toConnectData(schema, avroRecord);
SourceRecord record = new SourceRecord(


schemaAndValue.schema(),
schemaAndValue.value());

return record;



In my connect configuration I have defined "value.converter=io.confluent.connect.avro.AvroConverter” as the converter to use.

This works fine but I have two questions:

a) Since the payload is already Avro encoded, can the payload be stored as is without re-encoding eg. Avro->Connect- Schema ->Avro ? What ‘value.converter’ configuration can be used in that case?

b) What is the proper way to store the schema in the schema registry in this case ?

In essence, what is the proper way to handle _already_ Avro encoded messages when being received by a Kafka Source Connector?

Thank you in advance

-Christos

Randall Hauch

unread,
Jun 5, 2017, 4:31:29 PM6/5/17
to confluent...@googlegroups.com
Hi, Christos.

The Avro Converter does a bit more work than just serializing the records to Avro. It works with the Schema Registry to make sure that the proper Avro schema is actually registered in the registry, and will then include the registry's ID of that Avro schema inside each messages as the first 4 bytes of the serialized message. The remaining content of the serialized message is indeed just the Avro serialization of the message.

Right now Kafka Connect does not have an ability to define a `byte[]` value as an Avro-encoded serialized representation. Therefore, the approach you outline above is the most straightforward, robust, and maintainable way I know of to do this, and it uses public APIs that. But as you mention it is not the most efficient approach. It might help a bit to cache the Kafka Connect `Schema` object to amortize that work over many messages. But it still would have to deserialize every record to the Kafka Connect value representation.

The most efficient would be to write a custom Converter that does the same interaction with the schema registry that the Avro Converter does to ensure the Avro schema is registered and get the ID (using caching to avoid unnecessary work), to write out the schema ID, and then to assume that any `byte[]` value is already Avro encoded and to simply pass that through. Your source connector would have to pass the Avro-encoded value as a `byte[]` value. This approach would work and would be more efficient, but it would require a fair amount of non-trivial code and adopting some conventions on your part. You're the only one that can decide whether the slight improvement in efficiency justifies this extra effort.

Anyone else have any better ideas?

Randall

 


-Christos

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/E5B0DDAF-2253-423D-97A1-384856D084F4%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Christos Vasilakis

unread,
Jun 6, 2017, 2:40:30 PM6/6/17
to Confluent Platform
Hi, Randall

thank you very much for your detailed explanation, definitely cleared up my view of how things work under-the-hood. Regarding the custom Converter, it’s not a priority per se mostly asked cause I was little worried whether the approach of using 'AvroData' was the right one or I have been missing something.. From your response it’s seems it is just fine and already from my preliminary tests the performance is great!

Again thank you!

-Christos



To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CALYgK0GDAMYHpMhp-OXi43gCJFo8w1E0SHvVmXf5qsGNs%3DOqLA%40mail.gmail.com.
Reply all
Reply to author
Forward
0 new messages