Kafka streams: Why consumer lag can be very large for few partitions compared to remaining partition

3,013 views
Skip to first unread message

anand reddy akidi

unread,
Aug 15, 2017, 12:31:50 PM8/15/17
to Confluent Platform
Hi All,

Why consumer lag can be very large for few partitions compared to remaining partitions?.  I have 4 Kafka streams application instances which reads data from topic with 8 partitions. Sometimes consumer lag going very large for 2 partitions  (sometimes only one) compared to remaining partitions. This is causing delay in realtime data getting processed for the data in these particular 2 partitions. My kstreams application handles data joining between kstream and ktable where ktable data is updated by daily cronjob ( ktable backed topic also has 8 partitions). Could you please help me with this issue?. 

Regards
Anand

Eno Thereska

unread,
Aug 17, 2017, 4:46:33 PM8/17/17
to Confluent Platform
Hi Anand, 

It might be that those 2 partitions see most of the load in the system, i.e., there could be skew in your data and its not uniformly distributed. Any chance you could partition differently at all? Or create more partitions?

How many threads do your 4 Kafka streams instances have each? 

Eno

anand reddy akidi

unread,
Aug 17, 2017, 7:50:24 PM8/17/17
to Confluent Platform
Input topic that kstreams application consuming is uniformly distributed beacuse those are not keyed messages. I am doing couple of joins ( kstream-ktable join ), for this I am extracting the key from the input message and rekeying for joining. when rekey the kstream, there is a chance it will not uniformly distribute because we get duplicate keys ( few of the keys  very high in number compare to other keys). For the last result kstream i will make key as null beacuse i want output topic uniformly distributed, but some how this is not happening. any suggestion on this?. and I use only one thread per instance. Do i have to use multiple threads?

Eno Thereska

unread,
Aug 18, 2017, 11:50:21 AM8/18/17
to Confluent Platform
Depending on where the bottleneck might be, sometimes it makes sense to set total number of threads to the number of partitions. With 8 partitions, it would be good if you had 2 threads per instance (so 2x4 instances = 8 threads total). The unit of parallelism in Kafka Streams is the number of partitions.

I didn't quite understand the part on the output topic keys to null. I suspect you could do a map() and set the keys to null there, before you output to Kafka.

Eno

anand reddy akidi

unread,
Aug 18, 2017, 12:27:08 PM8/18/17
to Confluent Platform
Thank you, increased the number of thread to 2.

yes, I am doing map() and the keys to null and doing some other processing (last step) and sending output to kafka.  I can see in the logs that last step process is performing by only one server ( but I want to split it across the instances..i have 4 instances). whether making keys to null is the reason?.   

Regards
Anand.

rahul koti

unread,
Dec 23, 2019, 10:27:08 PM12/23/19
to Confluent Platform
even i am facing the lag when i am joining ktable and kstream after rekeying the key i am joining and this lag happens intermittently i happen to see this lag once in 3 to 4 days, i have observed that when i add messages to ktable and may be after 24 hrs it causes lag i am not sure this causes the issue any

Harivittal HK

unread,
Dec 24, 2019, 12:48:26 AM12/24/19
to confluent...@googlegroups.com
Hi ,

Try to have them separated out to a different topic , the partition data where the lag pattern is observed and you can merge them back during the joins on the ktable, that way processes bottlenecks can get relief to some extent. 

Cheers,
Hari

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/c30d9918-f14c-409a-87d4-b49954648824%40googlegroups.com.

rahul koti

unread,
Dec 24, 2019, 12:57:17 AM12/24/19
to confluent...@googlegroups.com
Hi Hari,
Thanks for your response I rekey the kstream topic and then join ,
You are saying join the rekeyed topic and ktable as seperated topic ? and btw I am not seeing any lag in the rekeyed topic which gets created 

BadwolF ForeveR

unread,
Dec 24, 2019, 6:39:59 AM12/24/19
to confluent...@googlegroups.com
Hi Anand/Rahul,

With the given scenario, the lag can happen when you rekey and definitely at the last map when you are making the keys null to distribute data to multiple partitions in output topic.

This is called data skew, a very common issue in distributed processing of key-value data. To do joins the data is partitioned first in basis of HASH of the key then each hash-range is assigned a partition. Now if there are collection of keys that result in same hash range will end up in same partition making one partition bigger than rest of the partitions (note that it can happen without the keys being duplicate). To illustrate it further, suppose your keys are long values (after rekeying) and you have 8 partitions but all the long values are multiple of 8, then hash(key) % 8 would always result in 0 hence all the records will end up in single partitions and rest of the partitions will always be empty and the processing will happen in one node. This is called data skew.

Now having said that in your case you can make the KTabke as GlobalKTable (if the data is fairly large) then you do not have to rekey the input stream as the State of GlobalKTable will updates at each node. This is similar to broadcast joins in spark (where you copy the entire table to all nodes). However, using GlobalKTable would in crease your storage on each node as the same table will be copied to all nodes, so use judiciously.

Coming to the next issue of making key null in last map to distribute the final data to all partitions. Making all the keys null would cause the data skew at its best I.e. all the data will end up in one partition as explained above. Since you are making the keys null, it means that you are not interested in keys, so instead of making it null make it unique. You can use current timestamp as millisecond or microsecond or nanosecond as the key (depending on your data volume and rate) and you will get desired output without skew. Important to note that DO NOT use the same timestamp in the last map, just use current time in the lambda.

Also, if rekeying the input stream is not causing skew then do not use GlobalKTable. That would be unnecessary.

Hope it helps!

Vinay

On Dec 24, 2019, at 00:57, rahul koti <koti....@gmail.com> wrote:


Reply all
Reply to author
Forward
0 new messages