public static void main(String... args) throws IOException {
Properties props = new Properties();
props.put("bootstrap.servers", "SERVER_IPs:9092");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
int numThreads = Integer.parseInt(args[0]);
int maxIter = Integer.parseInt(args[1]);
ExecutorService exec = Executors.newFixedThreadPool(numThreads);
AtomicInteger counter = new AtomicInteger(0);
int soFar;
while ((soFar = counter.getAndIncrement()) < maxIter) {
final int soFarFinal = soFar;
exec.submit(() -> {
Consumer<String, String> consumer;
TopicPartition tp = null;
try {
consumer = new KafkaConsumer<>(props);
tp = new TopicPartition("TOPIC", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Arrays.asList(tp));
} catch (Exception e) {
throw e;
}
long nowSeek = consumer.position(tp);
consumer.seek(tp, nowSeek - 1l);
consumer.poll(1000); // <----- Without this line, there is no memory leak!
consumer.unsubscribe();
consumer.close();
if(soFarFinal % 10000 == 0){
System.out.println("=== " + soFarFinal + " loops finished ===");
}
});
}
exec.shutdown();
int total = counter.get() - 1;
System.out.println("Total submissions: "+ total);
System.out.println("Each thread processed: " + total/numThreads);
}