Suspend/Resume kafka consumers for catching up with each other

1,468 views
Skip to first unread message

Varun Saini

unread,
Aug 16, 2016, 7:43:53 PM8/16/16
to Confluent Platform
Hi,

I am trying to build a simple data comparison service using kafka consumers. Here is the logic -
1. Create two consumers with two different topics (those topics have same data but can be at any offset at the time of comparison)
2. These topics store data in avro format. 
3. Topics have an id in record that is unique.
4. When I start comparison, I check the id for one consumer, if this consumer tracking an id that is greater than the id for another consumer, this consumer need to wait for another consumer(so I need to suspend it) to catch up.
5. When both consumer are at same id, then I need to resume suspended consumer and do other comparisons between them.

I am thinking of running two consumers in separate threads. But I haven't been able to figure out suspend/resume logic. 

Please let me know if it looks like something that can be done. If not, I will figure out something else?

Thanks,
Varun

ha...@confluent.io

unread,
Aug 17, 2016, 1:54:08 AM8/17/16
to Confluent Platform
This sounds just like what is described in the new KafkaConsumer docs ( see https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)

"Consumption Flow Control

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams. When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are a lot of history data to catch up, the applciations usually wants to get the latest data on some of the topics before consider fetching other topics.

Kafka supports dynamic controlling of consumption flows by using pause(TopicPartition...) and resume(TopicPartition...) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls."

Varun Saini

unread,
Aug 17, 2016, 12:21:03 PM8/17/16
to Confluent Platform
Thanks for the reply.

That takes me to my earlier R&D, where I was trying to figure out "If I can use kafka-stream to do this comparison"??

"When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic in order to get the lagging stream to catch up" 

^^ This is what I want to build (based on one check).

Thanks,
Varun
Reply all
Reply to author
Forward
0 new messages