Hi,
A KTable follows Kafka's changelog semantics, ie, a record can be
deleted by a so-called tombstone message with value=null. For example,
do delete a user with ID=5 you can send a message as <5:null>.
Thus, you can simplify you overall approach. You just receive your
command-stream and use a if-then-else construct in your map. If you get
a create-or-update-event you do create the user (to insert or
overwrite), if you get a delete even you create a tombstone.
Furthermore, you can simplify your code, by writing the stream of newly
created users and tombstones into a topic and just read the topic as a
table -- so you do not need all the filtering, mapping, grouping etc.
Something like:
> builder.stream(keySerde, valSerde, "commands-topic")
> .map( (key,event) -> isCreateOrUpdate ? {
> Map customer = (Map) event.get(new Keyword("data"));
> String email = (String) customer.get(new Keyword("email"));
> return new KeyValue<String, Map>(email, customer);
> } : {
> Map customer = (Map) event.get(new Keyword("data"));
> String email = (String) customer.get(new Keyword("email"));
> return new KeyValue<String, Map>(email, null);
> } )
> .to("table-topic");
>
> KTable<String, Map> createdCustomerTable = builder.table("table-topic");
-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to
confluent-platf...@googlegroups.com
> <mailto:
confluent-platf...@googlegroups.com>.
> To post to this group, send email to
confluent...@googlegroups.com
> <mailto:
confluent...@googlegroups.com>.
> To view this discussion on the web visit
>
https://groups.google.com/d/msgid/confluent-platform/d7c66d7f-508e-424a-8298-d62749f20ba7%40googlegroups.com
> <
https://groups.google.com/d/msgid/confluent-platform/d7c66d7f-508e-424a-8298-d62749f20ba7%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit
https://groups.google.com/d/optout.