I am completely new to Kafka and avro and trying to use the confluent package. We have existing POJOs we use for JPA and I'd like to be able to simply produce an instance of my POJOs without having to reflect each value into a generic record manually. I seem to be missing how this is done in the documentation.
The examples use a generic record and set each value one by one like so:
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}There are several examples for getting a schema from a class and I found the annotations to alter that schema as necessary. Now how do I take an instance of a POJO and just send it to the serializer as is and have the library do the work of matching up the schema from the class and then copying the values into a generic record? Am I going about this all wrong? What I want to end up doing is something like this:
String key = "key1";
Schema schema = ReflectData.get().getSchema(myObject.getClass());
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema);
record = new ProducerRecord<Object, Object>("topic1", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
// may need to do something with it
}
Thanks!
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e068510b-9c9e-4a04-82f6-23bc79e4db35%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAA9QSJ%2BxSH_DCHNRy0zCF0OAcajEEASrmNuMOHuoyr4gX%2BeYEg%40mail.gmail.com.
public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer {
private final EncoderFactory encoderFactory = EncoderFactory.get();
@Override
protected byte[] serializeImpl(String subject, Object object) throws SerializationException {
//TODO: consider caching schemas
Schema schema = null;
if(object == null) {
return null;
} else {
try {
schema = ReflectData.get().getSchema(object.getClass());
int e = this.schemaRegistry.register(subject, schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
out.write(ByteBuffer.allocate(4).putInt(e).array());
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer = new ReflectDatumWriter<>(schema);
writer.write(object, encoder);
encoder.flush();
out.close();
byte[] bytes = out.toByteArray();
return bytes;
} catch (IOException ioe) {
throw new SerializationException("Error serializing Avro message", ioe);
} catch (RestClientException rce) {
throw new SerializationException("Error registering Avro schema: " + schema, rce);
} catch (RuntimeException re) {
throw new SerializationException("Error serializing Avro message", re);
}
}
}
}
This solution doesn't require the user of this serializer to do much more than add annotations from the reflect library to their pojos to control which attributes are serialized and how. I welcome any feedback about this strategy. I'd love to see an object like this make it into the confluent library as it allows for a very intuitive use case from a producer that is simply:
... normal producer setup
kafkaProducer.send(new ProducerRecord<Object, Object>(topic, key, myPojoInstance));
Thanks again!
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
--
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e068510b-9c9e-4a04-82f6-23bc79e4db35%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
Not in my case. I already have objects in my application and I don't want to copy them to a generated class copy. We aren't starting from schema files, we are inducing them from the objects we want to send to a Kafka topic directly.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e068510b-9c9e-4a04-82f6-23bc79e4db35%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAA9QSJ%2BxSH_DCHNRy0zCF0OAcajEEASrmNuMOHuoyr4gX%2BeYEg%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/2pDNLiMhVRU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1891585b-61cc-45e5-b175-290462b2c46a%40googlegroups.com.