Hi All,
I have written a light weight Kafka consumer python app, which reads the records from broker and processes those. However, in most runs I can observe there are missing records. For example from producer if I send 500 messages, the consumer app is not receiving that many most of the time. Below is the code snippet:
...
from kafka import KafkaConsumer
....
self.consumer: KafkaConsumer = KafkaConsumer(topic,
bootstrap_servers=bootstrap_servers,
group_id=groupId,
enable_auto_commit=auto_commit, < - True
auto_offset_reset=offset_reset, < - earliest
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
...
while True:
try:
msg_pack = self.consumer.poll(timeout_ms=consumer_poll_timeout,
max_records=max_poll_records)
if bool(msg_pack):
for topic_partition, messages in msg_pack.items():
count += 1
processRecords.record_handler(messages[0])
where, timeout_ms set to 1000ms and max_poll_records is 10
Interestingly, if I run the kafka consumer application (kafka-console-consumer.sh) which is bundled with the Kafka install pack, there is no missing records.
So, surely there is something wrong in using the kafka python APIs. Can anyone help me please to identify the potential issue.
Thanks
Abhijit