Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "kafka-server-url");
kafkaProps.put("schema.registry.url", "schema-registry-url");
kafkaProps.put("key.serializer", KafkaAvroSerializer.class.getCanonicalName());
kafkaProps.put("value.serializer", KafkaAvroSerializer.class.getCanonicalName());
KafkaProducer<GenericRecord, GenericRecord> producer = new KafkaProducer<>(kafkaProps);
GenericRecord key =
new GenericData.Record(keySchema);key.put("idField1, "1234");
key.put("idField2", "5678");
GenericRecord value =
new GenericData.Record(valueSchema);key.put("valueField1", "abcd");
key.put("valueField2", "efgh");
ProducerRecord<GenericRecord, GenericRecord> record = new ProducerRecord<>("theTopic", key, value);
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception != null) {
LOG.error(exception.getMessage());
}
});