Kafka consumer receiving same message multiple times

616 views
Skip to first unread message

Shamik Bandopadhyay

unread,
Sep 27, 2016, 9:11:47 PM9/27/16
to kafka-clients
Hi,

  I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer group which has a dedicated thread for each partition. So each thread is dedicated to a partition. Here's a code snippet for polling data.

while (true) {
    try{
        if(consumerDao.canPollTopic()){
            ConsumerRecords records = 
              consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
            for (ConsumerRecord record : records) {
                if(record.value()!=null){
                    TextAnalysisRequest textAnalysisObj = record.value();
                    if(textAnalysisObj!=null){
                        PostProcessRequest req = new PostProcessRequest();
                        req.setRequest(this.getRequest(textAnalysisObj));
                        PreProcessorUtil.submitPostProcessRequest(req, config);
                    }
                }
            }
        }else{
            Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
} }


Here's the kafka consumer configuration parameters I'm setting. Rest are default values.

consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=180000
consumer.poll.records=2147483647
consumer.request.timeout=181000


Complete consumer config:

metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 181000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 180000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest

My sample kafka queue is having 8 partitions with 2 replication factor.

The log retention period in server.properties is setup as 168 hours.


log.retention.hours=168
log.roll.hours=168

Not sure what I'm missing here. Any pointers will be appreciated.
-Thanks,
Shamik
Reply all
Reply to author
Forward
0 new messages