Consumer missing messages

10 views
Skip to first unread message

Abhijit Dutta

unread,
Dec 7, 2024, 1:17:07 AM12/7/24
to kafka-clients
Hi All,

I have written a light weight Kafka consumer python app, which reads the records from broker and processes those. However,  in most runs I can observe there are missing records. For example from producer if I send 500 messages, the consumer app is not receiving that many most of the time. Below is the code snippet:
...
from kafka import KafkaConsumer
....

      self.consumer: KafkaConsumer = KafkaConsumer(topic,
                                                   bootstrap_servers=bootstrap_servers,
                                                   group_id=groupId,
                                                   enable_auto_commit=auto_commit, < - True
                                                   auto_offset_reset=offset_reset,  < - earliest
                                                   value_deserializer=lambda x: json.loads(x.decode('utf-8')))

...
         while True:
            try:
               msg_pack = self.consumer.poll(timeout_ms=consumer_poll_timeout,                    
                                                                         max_records=max_poll_records)
               if bool(msg_pack):
                  for topic_partition, messages in msg_pack.items():
                     count += 1
                     processRecords.record_handler(messages[0])

where, timeout_ms set to 1000ms and max_poll_records is 10 
Interestingly, if I run the kafka consumer application (kafka-console-consumer.sh) which is bundled with the Kafka install pack, there is no missing records.

So, surely there is something wrong in using the kafka python APIs. Can anyone help me please to identify the potential issue.

Thanks
Abhijit  


Reply all
Reply to author
Forward
0 new messages