Converting pojos to generic records in confluent.io to send through a KafkaProducer

2,238 views
Skip to first unread message

Joe Wiggins

unread,
Jan 5, 2016, 1:30:07 PM1/5/16
to Confluent Platform

I am completely new to Kafka and avro and trying to use the confluent package. We have existing POJOs we use for JPA and I'd like to be able to simply produce an instance of my POJOs without having to reflect each value into a generic record manually. I seem to be missing how this is done in the documentation.

The examples use a generic record and set each value one by one like so:


String key = "key1";
String userSchema = "{\"type\":\"record\"," +
   
"\"name\":\"myrecord\"," +
   
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord
.put("f1", "value1");

record
= new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
   producer
.send(record);
} catch(SerializationException e) {
   
// may need to do something with it
}


There are several examples for getting a schema from a class and I found the annotations to alter that schema as necessary. Now how do I take an instance of a POJO and just send it to the serializer as is and have the library do the work of matching up the schema from the class and then copying the values into a generic record? Am I going about this all wrong? What I want to end up doing is something like this:


String key = "key1";
Schema schema = ReflectData.get().getSchema(myObject.getClass());
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema);

record
= new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
   producer
.send(record);
} catch(SerializationException e) {
   
// may need to do something with it
}

Thanks!

Gwen Shapira

unread,
Jan 5, 2016, 1:55:27 PM1/5/16
to confluent...@googlegroups.com
Sounds like you are looking for the Avro ReflectDatumWriter?

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e068510b-9c9e-4a04-82f6-23bc79e4db35%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Wiggins Family

unread,
Jan 5, 2016, 5:39:01 PM1/5/16
to confluent...@googlegroups.com
I found this in my search, but couldn't correlate this back to using a KafkaProducer from confluent's library.  Is the idea that you use the ReflectDatumWriter to write to a byte array and then pass that in as the value field in the ProducerRecord or maybe wrap it with a GenericContainer?  Can you elaborate with a concrete example?  

I'm not writing to a file as in this example, I'm trying to make an asynchronous call and get back a Future leveraging all of the code they have in place to deal with the schema registry automatically as found in the KafkaAvroSerializer.

Thanks for the reply!

You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Wiggins Family

unread,
Jan 6, 2016, 7:06:07 PM1/6/16
to confluent...@googlegroups.com
Thanks for the input again, Gwen.

I found a solution that will work for me for now.  I am still working on supporting things like Java 8 timestamps that are nullable, but here is what I wound up doing.

I created my own serializer that extends the KafkaAvroSerializer and overrides the serializeImpl() method to completely use reflection on a POJO assuming that is what is passed in as the object:

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer {
private final EncoderFactory encoderFactory = EncoderFactory.get();

@Override
protected byte[] serializeImpl(String subject, Object object) throws SerializationException {
//TODO: consider caching schemas
Schema schema = null;

if(object == null) {
return null;
} else {
try {
schema = ReflectData.get().getSchema(object.getClass());
int e = this.schemaRegistry.register(subject, schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
out.write(ByteBuffer.allocate(4).putInt(e).array());

BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer = new ReflectDatumWriter<>(schema);
writer.write(object, encoder);
encoder.flush();
out.close();

byte[] bytes = out.toByteArray();
return bytes;
} catch (IOException ioe) {
throw new SerializationException("Error serializing Avro message", ioe);
} catch (RestClientException rce) {
throw new SerializationException("Error registering Avro schema: " + schema, rce);
} catch (RuntimeException re) {
throw new SerializationException("Error serializing Avro message", re);
}
}
}
}
This solution doesn't require the user of this serializer to do much more than add annotations from the reflect library to their pojos to control which attributes are serialized and how.  I welcome any feedback about this strategy.  I'd love to see an object like this make it into the confluent library as it allows for a very intuitive use case from a producer that is simply:
... normal producer setup
kafkaProducer.send(new ProducerRecord<Object, Object>(topic, key, myPojoInstance));
Thanks again!

gerard...@dizzit.com

unread,
Jan 7, 2016, 4:25:03 AM1/7/16
to Confluent Platform
Is it an option to let maven create the avro class from a schema? It solved my problems I have with using the avro serializer/deserializer.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

Wiggins Family

unread,
Jan 7, 2016, 8:50:22 AM1/7/16
to Confluent Platform

Not in my case.  I already have objects in my application and I don't want to copy them to a generated class copy.  We aren't starting from schema files, we are inducing them from the objects we want to send to a Kafka topic directly.


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages