- create 1 consumer (c1) and 1 producer thread - all the 3 partitions will be assigned to c1.
- after few seconds add one more consumer (c2) to the same consumer group that c1 was part of.
- this leads to 3 partitions be rebalanced between c1 and c2.
- logs show partitions are rebalanced when consumers are added/removed from the consumer group.
- if you look at number of messages in partitions assigned to consumer c1 (using bin/kafka-consumer-offset-checker.sh --topic test-again --group test-manual --zookeeper localhost:2181), there are lots of unprocessed messages.
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConsumerRebalance_Manual {
private static Logger logger = LoggerFactory.getLogger(ConsumerRebalance_Manual.class);
private static volatile boolean produceMsg = true;
private static volatile boolean consumeMsg = true;
static class ProduceMsg implements Runnable{
private int producerId;
public ProduceMsg(int producerId) {
this.producerId = producerId;
}
private void sleep(long sleepMs){
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
logger.info("Starting producer: {}", this.producerId);
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
logger.info("Starting with producing messages: {}", this.producerId);
Producer<String, String> producer = new KafkaProducer(props);
int counter = 0;
while(produceMsg) {
for (int i = 0; i < 10; i++) {
String msg = "message-" + counter++;
producer.send(new ProducerRecord<String, String>("test-again", msg));
}
logger.info("Sent a batch for publishing");
producer.flush();
sleep(1000);
}
producer.close();
logger.info("P: {}, Exiting producer!", this.producerId);
}
}
static class ConsumeMsg implements Runnable{
private int consumerId;
public ConsumeMsg(int consumerId) {
this.consumerId = consumerId;
}
private Properties getConsumerProperties(){
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test-manual");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.partition.fetch.bytes", 150);
props.put("session.timeout.ms", 15000);
props.put("heartbeat.interval.ms", 1000);
return props;
}
private void commitOffsets(KafkaConsumer<String, String> consumer, Map<TopicPartition, Long> partitionToOffsetMap){
if (partitionToOffsetMap.size() > 0) {
Map<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> singlePartitionEntry : partitionToOffsetMap.entrySet()) {
partitionToMetadataMap.put(singlePartitionEntry.getKey(), new OffsetAndMetadata(singlePartitionEntry.getValue() + 1));
}
if (partitionToMetadataMap.size() > 0) {
logger.info("C: {}, Thread: {}, Committing following offsets: {}", this.consumerId,
Thread.currentThread().getName(), partitionToMetadataMap);
consumer.commitSync(partitionToMetadataMap);
}
partitionToOffsetMap.clear();
}
}
private void sleep(long sleepMs){
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
logger.info("Starting consumer: {}", this.consumerId);
Map<TopicPartition, Long> partitionToUncommittedOffsetMap = new HashMap<>();
Properties props = getConsumerProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
for (TopicPartition topicPartition : collection) {
logger.info("C: {}, Thread: {}, ~~Unsubscribed to topic: {}, partition: {}, ", consumerId, Thread.currentThread().getName(),
topicPartition.topic(),
topicPartition.partition());
}
logger.info("C: {}, committing offsets when partitions were revoked", consumerId);
commitOffsets(consumer, partitionToUncommittedOffsetMap);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
OffsetAndMetadata currPartionData;
for (TopicPartition topicPartition : collection) {
currPartionData = consumer.committed(topicPartition);
logger.info("C: {}, Thread: {}, $$Subscriptions: Topic: {}, partition: {}, offset: {}",
consumerId, Thread.currentThread().getName(), topicPartition.topic(), topicPartition.partition(),
currPartionData.offset());
consumer.seek(topicPartition, currPartionData.offset());
}
}
};
consumer.subscribe(Arrays.asList("test-again"), listener);
logger.info("Starting to process records for consumer: {}", this.consumerId);
long lastUpdateTimeMs = System.currentTimeMillis();
while(consumeMsg) {
ConsumerRecords<String, String> records = consumer.poll(1000); //fetch more records to process
logger.info("C: {}, Number of records received: {} ", consumerId, records.count());
if (records == null || records.count() == 0){
logger.info("C: {}, Found no records. Sleeping for a while", this.consumerId);
sleep(500);
continue;
}
//process each record
for (ConsumerRecord<String, String> record : records) {
logger.info("C: {}, Received message: Topic: {}, Partition: {}, Offset: {}, Thread: {}",
this.consumerId, record.topic(), record.partition(), record.offset(),
Thread.currentThread().getName());
//track and commit offset of msg processed
partitionToUncommittedOffsetMap.put(new TopicPartition(record.topic(), record.partition()), record.offset());
if (partitionToUncommittedOffsetMap.size() > 0 && (System.currentTimeMillis() - lastUpdateTimeMs > 1000)){
commitOffsets(consumer, partitionToUncommittedOffsetMap);
lastUpdateTimeMs = System.currentTimeMillis();
}
sleep(200);
}
}
commitOffsets(consumer, partitionToUncommittedOffsetMap);
consumer.close();
logger.info("Exiting consumer: {}", this.consumerId);
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(new ProduceMsg(1));
executorService.submit(new ConsumeMsg(1)); //start first consumer
Thread.sleep(3000);
executorService.submit(new ConsumeMsg(2)); //start second consumer after few seconds
Thread.sleep(TimeUnit.SECONDS.toMillis(180)); //let both consumer run for few minutes
produceMsg = false;
consumeMsg = false;
executorService.shutdown();
logger.info("Waiting for executor service shutdown");
while(!executorService.isTerminated()){}
logger.info("Exiting app");
}
}