Tuning Consumer Configs for HDFS Connector

245 views
Skip to first unread message

Arun Mehta

unread,
Aug 13, 2016, 11:51:32 PM8/13/16
to Confluent Platform
I am having trouble understanding the following consumer/Kafka Connect configs. I have looked at the documentation and I wanted to see if my interpretation is correct.

Right now I am setting the following configs to:
max.partition.fetch.bytes =  1 MB
max.poll.records = 1

I am confused about the difference between those two configs, is it that it pulls 1 MB worth of messages from Kafka and then each poll grabs 1 message out of that 1 MB worth of messages? Then the tasks must finish that entire fetch before the time configured in session.timeout.ms? The reason I am seeking clarification is because I am trying to resolve a CommitFailedException and the exception states to decrease max.poll.records, and increase session.timeout.ms, however I am unsure of where the max.partition.fetch.bytes comes into play.

Thank you

ha...@confluent.io

unread,
Aug 14, 2016, 4:08:47 PM8/14/16
to Confluent Platform

max.partition.fetch.bytes - is the maximum number of bytes the server will return to this client (per partition).

max.poll.records - sets an upper bound on the number of records returned in a single call to poll()

So by setting your consumer parameters as you did to the following:

max.partition.fetch.bytes =  1 MB
max.poll.records = 1
 
You will get at most, a single message for each poll()  and that message will be any size up to a maximum of 1MB (which happens to be the default maximum message size). This will work so long as you have not changed the maximum message size on the broker to be larger than the 1MB default.

For more detail see below.

Quoting from the book "Kafka: The Definitive Guide" ( see https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html)

"max.partition.fetch.bytes

This property controls the maximum number of bytes the server will return per partition. The default is 1MB, which means that when KafkaConsumer.poll() returns ConsumerRecords, the record object will use at most max.partition.fetch.bytes per partition assigned to the Consumer. So if a topic has 20 partitions, and you have 5 consumers, each consumer will need to have 4MB of memory available for ConsumerRecords. In practice, you will want to allocate more memory as each consumer will need to handle more partitions if other consumers in the group fail. max.partition.fetch.bytes must be larger than the largest message a broker will accept (max.message.size property in the broker configuration), or the broker may have messages that the consumer will be unable to consumer, in which case the consumer will hang trying to read them. Another important consideration when setting max.partition.fetch.bytes is the amount of time it takes the consumer to process data. As you recall, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a single poll() returns is very large, it may take the consumer longer to process, which means it will not get to the next iteration of the poll loop in time to avoid a session timeout. If this occures the two options are either to lower max.partition.fetch.bytes or to increase the session timeout."

Now onto max.poll.records = 1

Quoting from KIP-41 (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records)

"We add a new configuration setting max.poll.records to the KafkaConsumer API which sets an upper bound on the number of records returned in a single call to poll(). As before, poll() will return as soon as either any data is available or the passed timeout expires, but the consumer will restrict the size of the returned ConsumerRecords instance to the configured value of max.poll.records. The default setting (-1) will preserve the current behavior, which sets no upper bound on the number of records."

I hope these references help your understanding of these configuration parameters.

-hans

Arun Mehta

unread,
Aug 14, 2016, 5:52:20 PM8/14/16
to Confluent Platform
Thank you for your response,

To clarify, if I set no max.poll.records it I will receive 1 MB worth of records (or whatever the fetch.bytes is set to), correct?

And this 1 MB worth of messages/records needs to be finished processing before the time set in session.timeout.ms? Likewise, if I set max.poll.records to 1, then I will need to finish processing that single record within the session.timeout.ms time as well?

Thanks again.

ha...@confluent.io

unread,
Aug 14, 2016, 6:11:52 PM8/14/16
to Confluent Platform
If you set max.poll.records to 1 you will receive 1 message (not 1MB worth of records). If there were 2 messages of size 512 Kbytes each then you will only receive 1 message. If there were one message of size 2MB you would receive only the first 1MB of the message and get an error. If there were multiple 1MB messages you would receive 1 message of 1MB size and no error.

Heartbeats are currently sent to the broker when you call poll() so you have to process the 1 message you receive within the time specified in session.timeout.ms or the brokers will rebalance the consumer group and give the message to another consumer.



"The timeout used to detect failures when using Kafka's group management facilities. When a consumer's heartbeat is not received within the session timeout, the broker will mark the consumer as failed and rebalance the group. Since heartbeats are sent only when poll() is invoked, a higher session timeout allows more time for message processing in the consumer's poll loop at the cost of a longer time to detect hard failures."

-hans
Reply all
Reply to author
Forward
0 new messages