"kafkaConsumerConfig": {
"bootstrap.servers": "localhost:9092",
"group.id": "consumer_grp_1",
"key.deserializer": "org.apache.kafka.common.serialization.LongDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"max.poll.records": 100
},
KafkaConsumer<Long, String> consumer = KafkaConsumer.create(vertx, kafkaConf);
// Subscribe to the topic.
consumer.subscribe(queueConfig.getQueueName());
consumer.handler(record -> {
log.debug("Processing key=" + record.key() + ",value=" + record.value() + ",partition="
+ record.partition() + ",offset=" + record.offset());
........
......
....
});
Deploying the vertx:
final int workerPoolSize = 100;
Vertx.clusteredVertx(options, cluster -> {
try {
DeploymentOptions workerDeployOptions =
new DeploymentOptions().setWorker(true).setWorkerPoolSize(workerPoolSize);
if (cluster.succeeded()) {
cluster.result().deployVerticle(new KafkaListenersVerticle(dsConfig, cluster.result()),
workerDeployOptions, res -> {
if (res.succeeded()) {
log.info("KafkaListeners Verticle Deployment id is: " + res.result());
} else {
log.error("KafkaListeners Verticle Deployment failed! ");
}
});
}
});
Is there any parameters where I can assign different threads to read from different partitions of kafka given that the topic has 20 partitons to speed up the process of reading.
Thanks.
Though I don't set up a cluster with vertx but let the kafka broker handle the distributed clustering with its zookeeper setup. You'll have clustering for your apps and also extra clustering in the kafka consumer if they are apart of an consumer group.
Consumer groups in my opinion are the way to go so your vertx instances don't need extra clustering unless you use if for other things of course :)