builder.table("foo", "bar")
.groupBy((key, value) -> new KeyValue<>(((String) key), (String) value))
.aggregate(new SumInitializer(), new SumAdder(), new SumSubstractor(), Serdes.String(), "foobar")
.print();
SumInitializer, SumAdder and SumSubstractor are implementations of Initializator, Aggregator and Aggregator respectively.
I have some println in my Adder and Substractor and notice that the subtractor is called for each record that is processed. My use case is append only. Right now my subtractor does except return the old aggregate back but my question is - why is the subtractor being called? - what should my subtractor do if I want appends only?
thanks
Please check out the JavaDocs about aggregate:
https://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Aggregator,%20java.lang.String)
Hope this explains how it works.
If you don't want to substract, you can simply return the old value
without modifying it.
Overall, I am wondering why you use builder.table() and not
builder.stream()? That seems to be more appropriate.
Right now, you build two KTables, but I guess you only want one.
-Matthias
On 3/1/17 1:05 PM, Minoo Singh wrote:
> Hello,
>
> I have the following code where builder is a KStream.
>
> builder.table("foo", "bar")
> .groupBy((key, value) -> new KeyValue<>(((String) key), (String) value))
> .aggregate(new SumInitializer(), new SumAdder(), new SumSubstractor(), Serdes.String(), "foobar")
> .print();
>
>
> SumInitializer, SumAdder and SumSubstractor are implementations of Initializator, Aggregator and Aggregator respectively.
>
>
> I have some println in my Adder and Substractor and notice that the subtractor is called for each record that is processed. My use case is append only. Right now my subtractor does except return the old aggregate back but my question is - why is the subtractor being called? - what should my subtractor do if I want appends only?
>
>
> thanks
>
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platform+unsub...@googlegroups.com
> <mailto:confluent-platform+unsub...@googlegroups.com>.