Kafka Streams, repartition topics and retention

742 views
Skip to first unread message

nicolas....@finn.no

unread,
Jan 24, 2017, 6:41:40 AM1/24/17
to Confluent Platform
Hello,

A Streams app of ours consumes about 100millions msg/day and creates 10 buckets (i.e. for each incoming key-value, 10 new key-value-s) on each of which a count aggregate is done. This may well be a bad design for this type of aggregation, and may be re-designed by f.ex. using the aggregate method or something like that (didn't investigate that bit yet).
However, it worked fine for some days until we started to run out of disk space. It seems that the generated repartition topics started growing huge. 
As of today, our Confluent setup has a default retention of 3 days on automatically created topics. 
Would it be a solution to decrease that retention time? Or would that impact the Streams app in a negative way?

Best regards
Nicolas

Damian Guy

unread,
Jan 24, 2017, 10:21:18 AM1/24/17
to Confluent Platform
Hi Nicolas,

It isn't immediately obvious to me what your app is doing and if indeed it could be designed better. However, if you are creating 10x key/value pairs for each incoming record, i'm not surprised the repartition topic is growing rapidly.

If you decrease the retention time then you are obviously running the risk of losing data. For example, if one partition is not processed by the streams app for longer than the retention time then some data may be lost (and hence never processed). So you'd want to make sure that the streams app is keeping up with the incoming data, specifically you will want to check the offset lag on the repartition topics. You can check the lag  by using the kafka-consumer-groups.sh command.

Is there any chance you could use compacted topics for this? They would also save you some space.

Thanks,
Damian


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/0904dc86-6bf7-4986-b6d7-d99c9083e5dc%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Afsar Alam

unread,
Jan 24, 2017, 4:23:44 PM1/24/17
to Confluent Platform

nicolas....@finn.no

unread,
Jan 24, 2017, 4:48:26 PM1/24/17
to Confluent Platform
Hi Damian,

Thanks for the input.

I decreased the retention time to 1 day which decreased the disk usage as expected. I had taken the app down for more a bit more than 48 hours and it caught up with all the queued messages in only a few minutes. Ultimately the app should never go down more than a few minute (as of now it is more of a PoC). Same with the repartition topic whose lag went to 0 in a few minutes.

I can definitely compact the output topic (it goes into a connect JDBC sink for now; thinking about PoCing interactive queries).

In case you can come up with a better stream design, here is a short description of what the app does.
Basically the app builds time buckets (like "year 2017", or "year 2017 month 1", or again "year 2017 month 1 day 20") and then counts the number of events that ended up in these buckets. A simplified schematic version of it (kind of building a MultiMap and counting the elements for each entry, with a value counting in several entries) would be the following:

new KStreamBuilder()
       .stream(/*topics*/)
       .flatMap((key, value) 
                 -> Arrays.asList(yearBucket(value), // new KeyValue with key being the year of this value and its uid
                                  monthBucket(value), // new KeyValue with key being the month of this value and its uid
                                  dayBucket(value), // and so on...
                                  hourBucket(value),
                                  allTimeBucket(value),
                                  globalYearBucket(value), // new KeyValue with key being only the year, so as to count all the events of this year on the system
                                  globalMonthBucket(value), // and so on...
                                  globalDayBucket(value),
                                  globalHourBucket(value),
                                  globalAllTimeBucket(value)))
       .groupByKey()
.count("counters")
       .toStream()
       .map(toAvro)
.to(avroKeySerde, avroValueSerde, outputTopic);

Thanks for the help.

Best regards,
Nicolas


tirsdag 24. januar 2017 16.21.18 UTC+1 skrev Damian Guy følgende:
Hi Nicolas,

It isn't immediately obvious to me what your app is doing and if indeed it could be designed better. However, if you are creating 10x key/value pairs for each incoming record, i'm not surprised the repartition topic is growing rapidly.

If you decrease the retention time then you are obviously running the risk of losing data. For example, if one partition is not processed by the streams app for longer than the retention time then some data may be lost (and hence never processed). So you'd want to make sure that the streams app is keeping up with the incoming data, specifically you will want to check the offset lag on the repartition topics. You can check the lag  by using the kafka-consumer-groups.sh command.

Is there any chance you could use compacted topics for this? They would also save you some space.

Thanks,
Damian


On Tue, 24 Jan 2017 at 11:41 <nicolas....@finn.no> wrote:
Hello,

A Streams app of ours consumes about 100millions msg/day and creates 10 buckets (i.e. for each incoming key-value, 10 new key-value-s) on each of which a count aggregate is done. This may well be a bad design for this type of aggregation, and may be re-designed by f.ex. using the aggregate method or something like that (didn't investigate that bit yet).
However, it worked fine for some days until we started to run out of disk space. It seems that the generated repartition topics started growing huge. 
As of today, our Confluent setup has a default retention of 3 days on automatically created topics. 
Would it be a solution to decrease that retention time? Or would that impact the Streams app in a negative way?

Best regards
Nicolas

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages