Kafka broker memory leak triggered by many consumers

447 views
Skip to first unread message

Bogdan Istrate

unread,
May 26, 2017, 5:40:19 PM5/26/17
to kafka-clients
Hi All!

I'm getting a memory leak on my Kafka server when I run my Java app that creates a new Consumer every time a user does a request. 
The leak is happening on my ec2 instance running the Kafka server itself, not on the instances running the clients. 
After a while, garbage collection can't clear the heap, even if I fully stop my app and no more consumers are connected and no new ones are created. The GC pause time overwhelms everything else and everything crashes. 
The heap clears when I restart kafka, but gets full again eventually. 

I've narrowed down the leak to the line:
consumer.poll(1000)
If I leave all the rest of the code (creating and closing consumers), but remove the above line, the memory leak goes away.

I've posted an SO question about this with many more details here.

I've been told this is not necessarily the best way to approach this problem (creating a brand new consumer each time I want to poll my topic for only one message), and I'm open to alternatives. 
However, I think there is a bug in Kafka when used in this fashion. 

I'm running Kafka 0.10.2.1 on the clients and Kafka 0.10.2.0 on the server. 

Here is the code I'm using to replicate the problem (not the same code as production, since in production, the consumers are created on demand at a rate that I can't predict):
(Running this code on 16 threads for ~360,000 iterations generates the memory leak consistently; stopping this code does not clear the heap)
(In production, this issue takes about 2 weeks to arise)
public static void main(String... args) throws IOException {
Properties props = new Properties();

props.put("bootstrap.servers", "SERVER_IPs:9092");
props.put("session.timeout.ms", 30000);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000);
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);

int numThreads = Integer.parseInt(args[0]);
int maxIter = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newFixedThreadPool(numThreads);
AtomicInteger counter = new AtomicInteger(0);
int soFar;
while ((soFar = counter.getAndIncrement()) < maxIter) {
final int soFarFinal = soFar;
exec.submit(() -> {
   Consumer<String, String> consumer;
TopicPartition tp = null;
try {
consumer = new KafkaConsumer<>(props);
tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
} catch (Exception e) {
throw e;
}
long nowSeek = consumer.position(tp);
consumer.seek(tp, nowSeek - 1l);
consumer.poll(1000); // <----- Without this line, there is no memory leak!
consumer.unsubscribe();
consumer.close();

if(soFarFinal % 10000 == 0){
System.out.println("=== " + soFarFinal + " loops finished ===");
}
});
}
exec.shutdown();
int total = counter.get() - 1;
System.out.println("Total submissions: "+ total);
System.out.println("Each thread processed: " + total/numThreads);
}

Thanks very much!
Bogdan
Reply all
Reply to author
Forward
0 new messages