Consumption from some of the partitions stops after rebalancing of partitions

338 views
Skip to first unread message

Anurag Laddha

unread,
Feb 14, 2016, 1:25:50 PM2/14/16
to kafka-clients
Greetings

I have created a consumer using kafka 0.9
When one or more new consumers are added to an existing consumer group it causes partitions to get rebalanced (as expected)

Issue:
  • After rebalancing, all the partitions of the topic get assigned to one of the consumers in the group (again, as expected) but some of the consumers never consume from some of the partitions after rebalancing happens even when those partitions have pending messages to be consumed.
  • This issue happens in both manual and auto offset commit mode
  • This happens after partitions are rebalanced when a new consumer is added or an existing consumer leaves/dies
I have attached the code sample which demonstrates the problem:
- create a topic in kafka with 3 partitions (say "test-again" as topic name)
- create 1 consumer (c1) and 1 producer thread - all the 3 partitions will be assigned to c1.
- after few seconds add one more consumer (c2) to the same consumer group that c1 was part of.
- this leads to 3 partitions be rebalanced between c1 and c2.
- after this c1 never consumes from any of the partitions assigned to it

Log statement that demonstrates the issue:
- C: 1, Number of records received: 0  (consumer c1 doesn't receive any message)
- logs show partitions are rebalanced when consumers are added/removed from the consumer group.
- if you look at number of messages in partitions assigned to consumer c1 (using bin/kafka-consumer-offset-checker.sh --topic test-again --group test-manual --zookeeper localhost:2181), there are lots of unprocessed messages.

Can someone please tell me what wrong am i doing? 

Pasting the sample code below:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConsumerRebalance_Manual {
private static Logger logger = LoggerFactory.getLogger(ConsumerRebalance_Manual.class);
private static volatile boolean produceMsg = true;
private static volatile boolean consumeMsg = true;

static class ProduceMsg implements Runnable{
private int producerId;

public ProduceMsg(int producerId) {
this.producerId = producerId;
}

private void sleep(long sleepMs){
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void run() {
logger.info("Starting producer: {}", this.producerId);

Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

logger.info("Starting with producing messages: {}", this.producerId);
Producer<String, String> producer = new KafkaProducer(props);
int counter = 0;
while(produceMsg) {
for (int i = 0; i < 10; i++) {
String msg = "message-" + counter++;
producer.send(new ProducerRecord<String, String>("test-again", msg));
}
logger.info("Sent a batch for publishing");
producer.flush();
sleep(1000);
}
producer.close();
logger.info("P: {}, Exiting producer!", this.producerId);
}
}

static class ConsumeMsg implements Runnable{
private int consumerId;

public ConsumeMsg(int consumerId) {
this.consumerId = consumerId;
}

private Properties getConsumerProperties(){
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test-manual");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.partition.fetch.bytes", 150);
props.put("session.timeout.ms", 15000);
props.put("heartbeat.interval.ms", 1000);
return props;
}

private void commitOffsets(KafkaConsumer<String, String> consumer, Map<TopicPartition, Long> partitionToOffsetMap){
if (partitionToOffsetMap.size() > 0) {
Map<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> singlePartitionEntry : partitionToOffsetMap.entrySet()) {
partitionToMetadataMap.put(singlePartitionEntry.getKey(), new OffsetAndMetadata(singlePartitionEntry.getValue() + 1));
}
if (partitionToMetadataMap.size() > 0) {
logger.info("C: {}, Thread: {}, Committing following offsets: {}", this.consumerId,
Thread.currentThread().getName(), partitionToMetadataMap);
consumer.commitSync(partitionToMetadataMap);
}
partitionToOffsetMap.clear();
}
}

private void sleep(long sleepMs){
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void run() {
logger.info("Starting consumer: {}", this.consumerId);

Map<TopicPartition, Long> partitionToUncommittedOffsetMap = new HashMap<>();
Properties props = getConsumerProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
for (TopicPartition topicPartition : collection) {
logger.info("C: {}, Thread: {}, ~~Unsubscribed to topic: {}, partition: {}, ", consumerId, Thread.currentThread().getName(),
topicPartition.topic(),
topicPartition.partition());
}
logger.info("C: {}, committing offsets when partitions were revoked", consumerId);
commitOffsets(consumer, partitionToUncommittedOffsetMap);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
OffsetAndMetadata currPartionData;
for (TopicPartition topicPartition : collection) {
currPartionData = consumer.committed(topicPartition);
logger.info("C: {}, Thread: {}, $$Subscriptions: Topic: {}, partition: {}, offset: {}",
consumerId, Thread.currentThread().getName(), topicPartition.topic(), topicPartition.partition(),
currPartionData.offset());
consumer.seek(topicPartition, currPartionData.offset());
}
}
};
consumer.subscribe(Arrays.asList("test-again"), listener);

logger.info("Starting to process records for consumer: {}", this.consumerId);
long lastUpdateTimeMs = System.currentTimeMillis();

while(consumeMsg) {
ConsumerRecords<String, String> records = consumer.poll(1000); //fetch more records to process
logger.info("C: {}, Number of records received: {} ", consumerId, records.count());
if (records == null || records.count() == 0){
logger.info("C: {}, Found no records. Sleeping for a while", this.consumerId);
sleep(500);
continue;
}
//process each record
for (ConsumerRecord<String, String> record : records) {
logger.info("C: {}, Received message: Topic: {}, Partition: {}, Offset: {}, Thread: {}",
this.consumerId, record.topic(), record.partition(), record.offset(),
Thread.currentThread().getName());

//track and commit offset of msg processed
partitionToUncommittedOffsetMap.put(new TopicPartition(record.topic(), record.partition()), record.offset());
if (partitionToUncommittedOffsetMap.size() > 0 && (System.currentTimeMillis() - lastUpdateTimeMs > 1000)){
commitOffsets(consumer, partitionToUncommittedOffsetMap);
lastUpdateTimeMs = System.currentTimeMillis();
}

sleep(200);
}
}

commitOffsets(consumer, partitionToUncommittedOffsetMap);
consumer.close();
logger.info("Exiting consumer: {}", this.consumerId);
}
}


public static void main(String[] args) throws InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(new ProduceMsg(1));
executorService.submit(new ConsumeMsg(1)); //start first consumer

Thread.sleep(3000);
executorService.submit(new ConsumeMsg(2)); //start second consumer after few seconds

Thread.sleep(TimeUnit.SECONDS.toMillis(180)); //let both consumer run for few minutes
produceMsg = false;
consumeMsg = false;

executorService.shutdown();
logger.info("Waiting for executor service shutdown");
while(!executorService.isTerminated()){}
logger.info("Exiting app");
}
}
ConsumerRebalance_Manual_Thin.java
Reply all
Reply to author
Forward
0 new messages