public void publishToKafka(String topicName,String msg) throws Exception {
producerRecord = new ProducerRecord(topicName,msg);
final long startTime = System.currentTimeMillis();
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
kafkaFailurePublishServiceImpl.saveToCassandra(producerRecord.topic(),producerRecord.value().toString());
}
}
});
final long stopTime = System.currentTimeMillis();
System.out.println("Time Consumed = "+ (stopTime-startTime));
}
try {
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
kafkaFailurePublishServiceImpl.saveToCassandra(producerRecord.topic(), producerRecord.value().toString());
} else {
System.out.print("testing");
}
}
});
final long stopTime = System.currentTimeMillis();
System.out.println("Time Consumed = " + (stopTime - startTime));
} catch (KafkaException e) {
kafkaFailurePublishServiceImpl.saveToCassandra(producerRecord.topic(), producerRecord.value().toString());
}