Kafka Connector, object serialization and Schema Registry - how???

527 views
Skip to first unread message

Kodo

unread,
May 23, 2016, 6:52:02 AM5/23/16
to Confluent Platform
Hi!

I have the following use case:

We have a "data-packaging" class, "Envelope", which we use for describing various data-items. An objects is packaged and described within an envelope-instance and then stored. The structure of this envelope will change over time and I thought that AVRO could help us deal with this fact: at a given point in time, an object being packaged in, say, v1.2 of the envelope and then serialized as an AVRO object, would be de-serializable as an v1.2 envelope-object at a later date although the Envelope-class might have evolved to v2.3. I would like Kafka, AVRO and the Schema Registry to handle this "behind the scenes" and from what I've read, this should be possible. However, so far I've been unable to figure out how to accomplish this:(

I've installed the Confluent Data Platform v2.0.1 and have all of the components up and running together with two custom developed Kafka Connectors (Java). Now, what I want to accomplish (in both connectors) is to be able to:

(This will take place in the SourceTask.poll() method)

1. create an instance of version X of the Envelope-class
2. populate the envelope-instance with meta-data and an object stored as the payload - declared as a generic Java Object. There are some meta-data attributes/properties (strings, ints and longs) describing the actual payload
3. serialize the envelope in order to become the payload for a Kafka SourceRecord-instance
    this step should check in the Schema registry to see if the schema is already present. If it is just return the schema's ID - if not, register the new schema and return the generated ID. The ID should be serialized together with the envelope-instance
4. make the envelope-instance the payload for a new SourceRecord instance
5. add the new SourceRecord-instance to the ArrayList<SourceRecord> that should be returned from the SourceTask.poll() method

What do I need to do in the SourceTask.poll() method in order to get this to work?

Many thanks in advance!

gerard...@dizzit.com

unread,
May 23, 2016, 9:38:21 AM5/23/16
to Confluent Platform
We have a somewhat similar issue, and maybe you can solve it by using your 'own' serializer/deserializer combo you extend for the avro serializer;
Serialisation could be like this:
- you create a schema with payload (raw bytes probably), a version(you could just set a default), and possibly some other stuff
- in your serialize method you take your date and got to a specificrecordbase, whitch you pass to the avroserilizer to get the bytes.
- in your deserialize method you convert the bytes to the specificracordbase object, and you can read the information with the help of the version.

When you want to update the version you allow you (de)serializer to handle the new version. But this will require updating both the consumer and the producer before you can use the new type of massage. Another approach is to encode the length of the added bytes using a custom serializer, and do the avro serialisation afterwards. We use this so the consumers always now which part of the message contains the avro bytes (along with a minor and major version), and which part of the message is created for the avro deserializer. This way even if we change the size of the bytes we add to the message, 'old' consumer can still read the data. In case we want to use some kind of encryption which we currently do not support the old consumers will still fail.

I think since it is some kind of deserialisation, it's better to do it there, then in the poll of the consumer.

Kodo

unread,
May 24, 2016, 5:00:17 AM5/24/16
to Confluent Platform
Thought I'd try to clarify what I want to accomplish. Please note that we're not using a "producer" but a Kafka Connector SourceTask. The code therefore resides in the SourceTask.poll() method. We have the following code:

public List<SourceRecord> poll() throws InterruptedException {

// Create an AVRO schema using reflection. Envelope is our in-house developed "packaging/wrapper class"
Schema schema = ReflectData.get().getSchema(Envelope.class);
GenericRecord record = new GenericData.Record(schema);

.... do some stuff with the [record] object ...


record.put("xxx", "yyy);
record.put("payload", <primitive datatype or class>);
...
...
...


// Encode the [record] object
DatumWriter<GenericRecord> dw =    new SpecificDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
dw.write(record, encoder);
encoder.flush();
out.close();

// Finally, add a new SourceRecord to the ArrayList<SourceRecord> and write out the [record] object as a bytearray...
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, out.toByteArray()));

}





What I'd like to accomplish is to have the AVRO-schema of the [record] object being registered in the Schema Registry AND the existing/generated schema ID being serialized together with the [record] object before being added to the ArrayList<SourceRecord>. How may I accomplish this?

gerard...@dizzit.com

unread,
May 24, 2016, 10:23:23 AM5/24/16
to Confluent Platform
The easiest way to get it done is probably to start using the KafkaAvro encoder in the example. You have two options:
- The easiest option is to let kafka do it for you by setting the value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer, if your able to change the properties you use for creating the consumer, if you can do this you can put the avro object in the producer record, and kafka will in the background validate the schema, and a the schema id it get back to the bytestream which is saved.
- The other option is to create a KafkaAvroSerializer object in your producer code, configure it with the correct schema.registry.url (with the other option you will also need to give this property to the producer properties) and use it yourself like: bytes[] recordAndVersion = kafkaAvrodecoder.serialize("topic",record)

On the consuming side you need to do something similar, but the other way around, so using the KafkaAvroDezerializer to read the message, which will use the schema id encoded with the record. You can set specific.avro.reader=true if you have the Envelope.class in your classpath where the consumer is running, it will then give you an Envolope instance back.
Reply all
Reply to author
Forward
0 new messages