Best way to publish a single delayed aggregation using Kafka Streams

506 views
Skip to first unread message

Roland Hochmuth

unread,
Dec 18, 2016, 2:46:28 PM12/18/16
to Confluent Platform
I have a use case similar to the ones described at, https://groups.google.com/forum/#!topic/confluent-platform/rn8CJu7Wfcw and http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable, and I was still wondering if there was a recommendation on how to best handle it with Kafka Streams. Basically in my case, there are metrics being sent from a monitoring agent for many compute nodes in different regions and tenants. The metrics are tagged with the region and tenant and they need to be aggregated together in a number of ways, such as computing the sum of the values of the metric grouped by region or tenant per hour. Due to a number of reasons, such as delays in the network and/or clock skew, metrics may arrive prior to the start of a window or there could be late arriving metrics. To handle the delays and/or clock skew, I would like to publish a single per-hourly aggregated metric for each region or tenant that shows up in the aggregation window, with a configurable delay of around 10 minutes past the hour. Metrics would be consumed for a window up to around 10 minutes past the hour, the aggregated sum would also be continuously updated for the entire window, then at 10 minutes past the hour the single aggregated metric would be published back to a Kafka topic. At that point the window could be considered expired so new metrics that arrive 10 minutes past the hour could be ignored.

Does anyone have any general suggestions on the best approach for handling this type of scenario in Kafka Streams?

Regards --Roland


Eno Thereska

unread,
Dec 20, 2016, 8:26:14 AM12/20/16
to Confluent Platform
Hi Roland,

I won't add to what was said in those links, but perhaps you might consider using the Interactive Query APIs to simply have the app query the results at the frequency you need and then have them publish to a topic. (The blog on interactive queries is at https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/).

We did consider triggers in the past (see KIP-63 https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams) but opted for a different angle. Perhaps we'll consider them again in the future, but for now some of this will need to be done by the user.

Thanks
Eno

Roland Hochmuth

unread,
Jan 1, 2017, 12:18:29 PM1/1/17
to Confluent Platform
Hi Eno, Thanks for your suggestion. I ended up prototyping it and that approach is working out really well. Regards --Roland

Eno Thereska

unread,
Jan 1, 2017, 3:12:58 PM1/1/17
to Confluent Platform
Awesome!

Eno

Pooja Shekhar

unread,
Oct 18, 2018, 4:01:51 PM10/18/18
to Confluent Platform
Hi Roland,
I understand you are trying to solve a problem where late arriving records reach after the retention period is over. I am also working on a similar usecase, where I want to keep add the late arriving records as a part of aggregated value instead of just dropping it. I see Eno suggested using Interactive Query APIs, do you mind sharing how did you use it to solve the problem?
Regards,
Pooja Shekhar

Matthias J. Sax

unread,
Oct 19, 2018, 12:39:54 PM10/19/18
to confluent...@googlegroups.com
Side comment: In upcoming CP 5.1 (AK 2.1) a new operator called
`suppress` is added that should address this problem out-of-the-box.

Cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables

Note, that the KIP is implemented only partially in 5.1/2.1 though,
offering in-memory suppression only. The full implementation slipped and
will be contained in 5.2/2.2.

-Matthias

On 10/18/18 1:01 PM, Pooja Shekhar wrote:
> Hi Roland,
> I understand you are trying to solve a problem where late arriving
> records reach after the retention period is over. I am also working on a
> similar usecase, where I want to keep add the late arriving records as a
> part of aggregated value instead of just dropping it. I see Eno
> suggested using Interactive Query APIs, do you mind sharing how did you
> use it to solve the problem?
> Regards,
> Pooja Shekhar
>
> On Sunday, January 1, 2017 at 11:18:29 AM UTC-6, Roland Hochmuth wrote:
>
> Hi Eno, Thanks for your suggestion. I ended up prototyping it and
> that approach is working out really well. Regards --Roland
>
> On Tuesday, December 20, 2016 at 6:26:14 AM UTC-7, Eno Thereska wrote:
>
> Hi Roland,
>
> I won't add to what was said in those links, but perhaps you
> might consider using the Interactive Query APIs to simply have
> the app query the results at the frequency you need and then
> have them publish to a topic. (The blog on interactive queries
> is
> at https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>).
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams>)
> but opted for a different angle. Perhaps we'll consider them
> again in the future, but for now some of this will need to be
> done by the user.
>
> Thanks
> Eno
>
> On Sunday, 18 December 2016 19:46:28 UTC, Roland Hochmuth wrote:
>
> I have a use case similar to the ones described
> at, https://groups.google.com/forum/#!topic/confluent-platform/rn8CJu7Wfcw
> <https://groups.google.com/forum/#%21topic/confluent-platform/rn8CJu7Wfcw>
> and
> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
> <http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable>,
> and I was still wondering if there was a recommendation on
> how to best handle it with Kafka Streams. Basically in my
> case, there are metrics being sent from a monitoring agent
> for many compute nodes in different regions and tenants. The
> metrics are tagged with the region and tenant and they need
> to be aggregated together in a number of ways, such as
> computing the sum of the values of the metric grouped by
> region or tenant per hour. Due to a number of reasons, such
> as delays in the network and/or clock skew, metrics may
> arrive prior to the start of a window or there could be late
> arriving metrics. To handle the delays and/or clock skew, I
> would like to publish a single per-hourly aggregated metric
> for each region or tenant that shows up in the aggregation
> window, with a configurable delay of around 10 minutes past
> the hour. Metrics would be consumed for a window up to
> around 10 minutes past the hour, the aggregated sum would
> also be continuously updated for the entire window, then at
> 10 minutes past the hour the single aggregated metric would
> be published back to a Kafka topic. At that point the window
> could be considered expired so new metrics that arrive 10
> minutes past the hour could be ignored.
>
> Does anyone have any general suggestions on the best
> approach for handling this type of scenario in Kafka Streams?
>
> Regards --Roland
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Pooja Shekhar

unread,
Oct 21, 2018, 5:34:58 PM10/21/18
to Confluent Platform
Will this work even for low level Processor API, I see its only for DSL.

Thank you so much for your quick response. Any help/ideas appreciated.

Regards,
Pooja Shekhar

Matthias J. Sax

unread,
Oct 21, 2018, 5:46:27 PM10/21/18
to confluent...@googlegroups.com
`suppress()` is a DSL operator only. Note, that you can mix-an-match
both APIs though:
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration


-Matthias
> <https://groups.google.com/forum/#%21topic/confluent-platform/rn8CJu7Wfcw
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Pooja Shekhar

unread,
Oct 22, 2018, 10:02:26 AM10/22/18
to Confluent Platform
Thanks for reply.
1>  I was curious if we have all the ability to do all kind of stuffs that we can do using Processor API by using Integrated DSL with Processor API?
2>  Will a late arriving record pose issues in aggregation like hourly roll up while using Processor API for example if we have partitions operating at different timestamps due to late arrivals.

Regards,
Pooja Shekhar
>     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>>.
>     > To post to this group, send email to confluent...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     > To view this discussion on the web visit
>     >
>     https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Matthias J. Sax

unread,
Oct 22, 2018, 4:08:56 PM10/22/18
to confluent...@googlegroups.com
1> Yes.

2> If you use PAPI, it's your responsibility to handle out-of-order data
correctly for each processor you implement.


-Matthias
> >     > an email to confluent-platf...@googlegroups.com
> <javascript:>
> >     <javascript:>
> >     > <mailto:confluent-platf...@googlegroups.com
> <javascript:>
> >     <javascript:>>.
> >     > To post to this group, send email to
> confluent...@googlegroups.com
> >     <javascript:>
> >     > <mailto:confluent...@googlegroups.com <javascript:>>.
> >     > To view this discussion on the web visit
> >     >
> >    
> https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com>
>
> >    
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com>>
>
> >
> >     >
> >    
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer>
>
> >    
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
> >
> >     > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>
> >     <https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>>.
> >
> > --
> > You received this message because you are subscribed to the Google
> > Groups "Confluent Platform" group.
> > To unsubscribe from this group and stop receiving emails from it,
> send
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com>
>
> >
> <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
> > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/0db9ad62-1797-4bad-92fd-ccd713b3bb6b%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/0db9ad62-1797-4bad-92fd-ccd713b3bb6b%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Pooja Shekhar

unread,
Oct 23, 2018, 3:33:14 AM10/23/18
to Confluent Platform
Thanks a lot!
Regards,
Pooja Shekhar
>     >     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     >     <javascript:>
>     >     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     >     <javascript:>>.
>     >     > To post to this group, send email to
>     confluent...@googlegroups.com
>     >     <javascript:>
>     >     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     >     > To view this discussion on the web visit
>     >     >
>     >    
>     https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com>
>
>     >    
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com>>
>
>     >
>     >     >
>     >    
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer>
>
>     >    
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/23bbf35c-1702-4262-847d-4488c274eba4%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
>     >
>     >     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>
>     >     <https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>>.
>     >
>     > --
>     > You received this message because you are subscribed to the Google
>     > Groups "Confluent Platform" group.
>     > To unsubscribe from this group and stop receiving emails from it,
>     send
>     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>>.
>     > To post to this group, send email to confluent...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     > To view this discussion on the web visit
>     >
>     https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/666c7ce3-0227-4a0f-97da-73ef7b21f27b%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
Reply all
Reply to author
Forward
0 new messages