Kafka Consumer not consuming new messages randomly

8,104 views
Skip to first unread message

marcelbic...@gmail.com

unread,
Nov 22, 2016, 3:14:15 AM11/22/16
to Confluent Platform
Hello !

I contact you because I'm have a problem with my architecture using kafka.

I'm using Confluent 1.0 with kafka 0.8.2.
I have the following architecture :
- A job who calls every hour the kafka REST API behind a load-balancer.
- The Kafka REST API servers points on a kafka cluster of three brokers.
- The zookeeper servers are located on kafka brokers.

This problem concerns one topic with only one partition.
It is consumed by only one consumer group.

The process is the following :
0. There is a process running very often which produces new messages in the topic.
1. Each hour, the job create a consumer instance for the consumer group (POST request) through the load balancer. Then it gets the REST node where the consumer instance was created.
2. Then, the job consume the messages through a GET request with this consumer instance on the right REST node.
3. Once it is done, it commits the offset
4. And finally, once everything done, it deletes the consumer instance

I noticed the following problem :
One hour the job will work and get new messages.
And then during several hours, the command to retrieve the new messages will get nothing new, even if there is new messages in the topic, and even if the offset of the last consumed message is not the last.

Technically speaking, the REST query returns a 200 code with a body empty, when the problem occured.

I checked in zookeeper, all three zookeepers are well synchronized, and the offset is the same on all three zookeeper servers for this consumergroup for this topic.
I checked in kafka and it says the replicas are well insynced.

I noticed that if I add multiples retries in my process to get new messages, eventually it will work (let's say 20 retries with 10s of sleep between)

I was wonderinng what could explain this behaviour ?

I checked the kafka logs and see nothing which could explain the errors unfortunately.
Any idea where I could look to find the explanations of this behaviour ?

Best regards.

M.

Ewen Cheslack-Postava

unread,
Nov 23, 2016, 12:32:21 AM11/23/16
to Confluent Platform
What you're describing sounds like some sort of stall in the consumer. You are using quite an old version, so that could have something to do with it, but I'm not aware of any issues that match the exact behavior you're describing.

In particular, it sounds odd that retrying will get things to work (total of 200s by your description, which is < 4 minutes), but just waiting for the normal 200 responses for long enough doesn't (default request timeout is 1s, so doing this 200x should presumably be equivalent to the other method).

Can you provide any additional logs (even stuff that doesn't appear to indicate errors could help reveal the problem) or even turn up the log level on the REST proxy to help track down the issue?

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/90c19284-bb01-4bbd-b03a-dc857a6bbf16%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

julien.b...@capgemini.com

unread,
Nov 28, 2016, 12:46:19 PM11/28/16
to Confluent Platform

Hello,

 

I am in the exact same situation that Marcel.

Our  confluent version was 1.01 with Kafka 0.8.2.2, with a very similar uscase: There are messages waiting to be consumed on a new topic but sometimes, the job that consumes act like there is nothing to be consumed. Our read jobs is awakened hourly.


Furthermore, we tried to upgrade the confluents version from 1.0.1 to 3.1.1 (last) which has not changed anything. The problem is still happening randomly.

Nothing in logs indicate that something is going wrong.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

julien.b...@capgemini.com

unread,
Dec 13, 2016, 6:21:50 AM12/13/16
to Confluent Platform
Hello,

We have reproduced this issue by pushing  20 000 messages in a new Topic and passing following REST Requests. Between these REST requests we have checked in zookeeper how offset evolved and which owner is linked to the groupname.

We observed that 2 sec after the consumer creation, it seems that the consumer is not attached in zookeeper with the group and the the following read request returns empty event list.
2 more second after this empty read request, we observed that the consumer seems now references into zookeeper  for this group....


##
## Read messages on Kafka REST userbque03:8084 on Group: groupA30112016 and Topic: topicA30112016 that has been loaded with 20 k messages
##

# 20 000 Messages have been posted
kafka-consumer-offset-checker --broker-info --zookeeper localhost:2181 --group groupA30112016
[2016-11-30 14:26:37,031] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
groupA30112016  topicA30112016                 0   0               20000           20000           none
BROKER INFO
2 -> userbque02:9092

# Create Consumer
curl -XPOST -H 'Content-Type: application/vnd.kafka.binary.v1+json' -H 'Accept: application/vnd.kafka.binary.v1+json' http://userbque03:8084/consumers/groupA30112016 --data '{ "format":"binary","auto.offset.reset":"smallest","auto.commit.enable":"false"}'

# Response:

# Wait
sleep 2

# Check offset on Group
kafka-consumer-offset-checker --broker-info --zookeeper localhost:2181 --group groupA30112016
[2016-11-30 14:26:41,773] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
groupA30112016  topicA30112016                 0   0               20000           20000           none
BROKER INFO
2 -> userbque02:9092


# Read messages on Topic for my consumer
## Empty Result !!!
[]

sleep 2


# Check offset on Group
kafka-consumer-offset-checker --broker-info --zookeeper localhost:2181 --group groupA30112016
[2016-11-30 14:26:46,167] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
groupA30112016  topicA30112016                 0   0               20000           20000           groupA30112016_userbque03.socle-1480512398649-db43238f-0
BROKER INFO
2 -> userbque02:9092

# Commit offset on my consumer
[]

sleep 2

# Check offset on Group
kafka-consumer-offset-checker --broker-info --zookeeper localhost:2181 --group groupA30112016
[2016-11-30 14:26:50,292] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
groupA30112016  topicA30112016                 0   0               20000           20000           groupA30112016_userbque03.socle-1480512398649-db43238f-0
BROKER INFO
2 -> userbque02:9092

# Delete consumer

sleep 2

# Check offset on Group
kafka-consumer-offset-checker --broker-info --zookeeper localhost:2181 --group groupA30112016
[2016-11-30 14:26:54,806] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
groupA30112016  topicA30112016                 0   0               20000           20000           none
BROKER INFO
2 -> userbque02:9092

ha...@confluent.io

unread,
Dec 13, 2016, 3:29:18 PM12/13/16
to Confluent Platform
Is http://userbque03:8084 the address of the load balancer or is it one of the three REST Proxy instances?
Could you be load balancing all your REST requests to multiple REST Proxies and therefore creating multiple consumers in the same consumer group?
That would explain why one of them might get nothing if you have created more consumers in the group than partitions in the topic. It would also explain why after a period of time, the old consumers would timeout and the data would begin flowing again.

-hans

julien.b...@capgemini.com

unread,
Dec 16, 2016, 3:21:26 AM12/16/16
to Confluent Platform

Hi Hans,

My answers and comments:


Le mardi 13 décembre 2016 21:29:18 UTC+1, ha...@confluent.io a écrit :
Is http://userbque03:8084 the address of the load balancer or is it one of the three REST Proxy instances?

userbque03 is one of the REST proxy instance node.
 
Could you be load balancing all your REST requests to multiple REST Proxies and therefore creating multiple consumers in the same consumer group?

We do that before, but in order to isolate  and analyse the issue, we have addressed directly the REST node in that test. The issues was the same with the Load balancing.
 
That would explain why one of them might get nothing if you have created more consumers in the group than partitions in the topic. It would also explain why after a period of time, the old consumers would timeout and the data would begin flowing again.

I don't understand your analyse here, if I stay on one instance, there is no concurrent consumer as it is observed in zookeeper in my previous traces....

Best Regards,

Julien
 

Hans Jespersen

unread,
Dec 16, 2016, 1:50:20 PM12/16/16
to confluent...@googlegroups.com
Julien,

My thought was that you might still have the load balancer in the middle but since you say this happens even when you call directly to a single REST proxy then it should work.
It still sounds like the old consumers or consumer group is not getting deleted properly. Since you only have one topic partition there can only be one active consumer so if there is an old consumer hanging around then the next time you call the REST consumer will get nothing.
If you retry 20 times then that would be enough time for the phantom consumer to get timed out of the consumer group and the REST proxy consumer can make progress again.

Can you double check that your DELETE request is returning HTTP/1.1 204 No Content ? 
It should look something like "curl -X DELETE http://hostname:8082/consumers/my_consumer/instances/my_consumer_instance". Make sure you are using HTTP DELETE (and not POST or PUT)

After you delete the consumer there should be no consumers left in the consumer group. 
Can you check this is true as well using "bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic my_topic --group my_consumer_group --zookeeper hostname:2181"

For example on my development laptop I get the following output before the consumer delete...

Group           Topic                          Pid Offset          logSize         Lag             Owner
my_consumer_group demo_input                     0   63              63              0               my_consumer_group_MyLaptop.local-1491813886476-db77e0a3-0

..and I get the following output after I run the consumer delete REST call...

Group           Topic                          Pid Offset          logSize         Lag             Owner
my_consumer_group demo_input                     0   63              63              0               none


-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * ha...@confluent.io (650)924-2670
 */

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/k8OXXP1JIis/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/93817c81-1f23-4098-9932-01c632544179%40googlegroups.com.

julien.b...@capgemini.com

unread,
Dec 21, 2016, 3:02:53 AM12/21/16
to Confluent Platform

Hi Hans,


I confirm you that the DELETE request returns 204 and taht, when we check the group with ConsumerOffsetChecker tool, the owner column is set to none.

As you can check in my previous log, we have check that there was no consumer before we reproduce the issue:


Group           Topic                          Pid Offset          logSize         Lag             Owner
groupA30112016  topicA30112016                 0   0               20000           20000           none
BROKER INFO
2 -> userbque02:9092

Best regards,

Julien
To post to this group, send email to confluent...@googlegroups.com.

ha...@confluent.io

unread,
Dec 21, 2016, 12:23:56 PM12/21/16
to confluent...@googlegroups.com
This sure sounds like a bug then. Probably worth looking to see if it's a known/fixed issue with the REST Proxy that might have been addressed in Confluent 2.0, 3.0, or 3.1 releases. If this can be reproduced with the latest release you should open a JIRA.

-hans

Sent from my iPhone
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

julien.b...@capgemini.com

unread,
Jan 13, 2017, 3:50:34 AM1/13/17
to Confluent Platform
Hello,

We have finally found the root cause of this issue. 
It was due to the consumer.request.timeout.ms configuration parameter that has been set to 100ms in our configuration.
By setting it to 1000ms as default, the issue is resolved :-)

Best Regards,

Julien B.
Reply all
Reply to author
Forward
0 new messages