KGroupedTable aggregate semantics

416 views
Skip to first unread message

Minoo Singh

unread,
Mar 1, 2017, 4:05:45 PM3/1/17
to Confluent Platform
Hello,

I have the following code where builder is a KStream.

builder.table("foo", "bar")
.groupBy((key, value) -> new KeyValue<>(((String) key), (String) value))
.aggregate(new SumInitializer(), new SumAdder(), new SumSubstractor(), Serdes.String(), "foobar")
.print();

SumInitializer, SumAdder and SumSubstractor are implementations of Initializator, Aggregator and Aggregator respectively.

I have some println in my Adder and Substractor and notice that the subtractor is called for each record that is processed. My use case is append only. Right now my subtractor does except return the old aggregate back but my question is - why is the subtractor being called? - what should my subtractor do if I want appends only?

thanks


Matthias J. Sax

unread,
Mar 1, 2017, 5:25:26 PM3/1/17
to confluent...@googlegroups.com
Please check out the JavaDocs about aggregate:

https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Aggregator,%20java.lang.String)

Hope this explains how it works.

If you don't want to substract, you can simply return the old value
without modifying it.


Overall, I am wondering why you use builder.table() and not
builder.stream()? That seems to be more appropriate.

Right now, you build two KTables, but I guess you only want one.

-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/491fac39-a247-4be6-b398-49714306d3b7%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/491fac39-a247-4be6-b398-49714306d3b7%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Minoo Singh

unread,
Mar 1, 2017, 6:21:40 PM3/1/17
to Confluent Platform
The reason why I'm using Ktable instead of tables because streams would require windowing and is not giving me the results I need. With streams I'm getting multiple entries - maybe I'm doing something wrong?


On Wednesday, March 1, 2017 at 5:25:26 PM UTC-5, Matthias J. Sax wrote:
Please check out the JavaDocs about aggregate:

https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Aggregator,%20java.lang.String)

Hope this explains how it works.

If you don't want to substract, you can simply return the old value
without modifying it.


Overall, I am wondering why you use builder.table() and not
builder.stream()? That seems to be more appropriate.

Right now, you build two KTables, but I guess you only want one.

-Matthias


On 3/1/17 1:05 PM, Minoo Singh wrote:
> Hello,
>
> I have the following code where builder is a KStream.
>
> builder.table("foo", "bar")
>         .groupBy((key, value) -> new KeyValue<>(((String) key), (String) value))
>         .aggregate(new SumInitializer(), new SumAdder(), new SumSubstractor(), Serdes.String(), "foobar")
>         .print();
>
>
> SumInitializer, SumAdder and SumSubstractor are implementations of Initializator, Aggregator and Aggregator respectively.
>
>
> I have some println in my Adder and Substractor and notice that the subtractor is called for each record that is processed. My use case is append only. Right now my subtractor does except return the old aggregate back but my question is - why is the subtractor being called? - what should my subtractor do if I want appends only?
>
>
> thanks
>
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
Reply all
Reply to author
Forward
0 new messages