public static void main(String[] arg) throws InterruptedException {
context = SpringApplication.run(IPRKafkaConsumer.class);
startConsumers();
}
public static void startConsumers() throws InterruptedException
{
List<KafkaJavaConsumer> consumers = new ArrayList<>();
Integer numConsumer = context.getBean(“concurrentConsumerCount”, Integer.class);
ExecutorService executor = Executors.newFixedThreadPool(numConsumer);
for (int i=0; i<numConsumer; i++) {
KafkaJavaConsumer consumer = context.getBean(KafkaJavaConsumer.class);
consumer.setConsumer(new KafkaConsumer<String, String>(context.getBean(“consumerConfig”,Properties.class)));
consumer.setClientId(“Worker”+i);
consumers.add(consumer);
executor.submit(consumer);
}
}
public class KafkaJavaConsumer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJavaConsumer.class);
List<String> topics;
@Value("${kafka.consumer.topic}")
String topic;
private String kafkaConsumerOffset;
Map<TopicPartition, OffsetAndMetadata> offsets;
public void setClientId(String clientId){
this.clientId = clientId;
}
public KafkaConsumer<String, String> getConsumer() {
return consumer;
}
public void setConsumer(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
private KafkaConsumer<String, String> consumer;
private String clientId;
public void run() {
topics = new ArrayList<>();
topics.add(topic);
consumer.subscribe(topics);
offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
boolean recordUpdate = true;
try {
while (true) {
ConsumerRecords<String, String> polledRecords = consumer.poll(60000);
LOGGER.info("Size of Records polled " + polledRecords.count());
recordUpdate = processRecords(polledRecords);
}
}catch (Exception e) {
LOGGER.error("Error in polling records",e);
}finally {
consumer.close();
}
}
private boolean processRecords(ConsumerRecords<String, String> polledRecords)
throws IllegalAccessException, InvocationTargetException {
boolean recordUpdate = true;
for (ConsumerRecord<String, String> record : polledRecords) {
try {
recordUpdate = processRecord( record, 1);
} catch (Exception e) {
recordUpdate = false;
break;
}
}
//Seek the remaining records from other partitions, to receive them in next poll
if (!recordUpdate) {
Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
polledRecords.forEach(r ->
offsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()), k -> r.offset()));
offsets.forEach(consumer::seek);
}
return recordUpdate;
}
private boolean processRecord(ConsumerRecord<String, String> record, int retryCount)
throws IllegalAccessException, InvocationTargetException, InterruptedException {
boolean recordUpdate = true;
try {
kafkaConsumerOffset = record.topic() + ":" + record.partition() + ":" + record.offset();
LOGGER.info("Offset {" + kafkaConsumerOffset + "} Received in Kafka Consumer");
OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1);
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
offsets.put(partition, metadata);
---business logic written here
consumer.commitAsync(offsets, null);
}finally {
//consumer.commitAsync(offsets, null);
}
return recordUpdate;
}
}
We posted a message to topic with transaction. For testing purposes we are using single message. Offset incremented by 2, 1 for message and 1 for transaction
D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic transaction-test
transaction-test:0:6
Pushed another message to same topic with transaction. Offset incremented by 2.
D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic transaction-test
transaction-test:0:8
Consumer lag shows 4 when my consumer is down.
D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group transaction
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'transaction' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
transaction-test 0 4 8 4 -
Started the consumer and lag says 1
D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group transaction
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
transaction-test 0 7 8 1 consumer-1-7a792d01-f3df-4e33-9ccc-636f8110ba38 /
172.22.52.231 consumer-1