"Delete row" semantics for KTables

3,040 views
Skip to first unread message

Gurdas Nijor

unread,
Jan 3, 2017, 3:03:01 AM1/3/17
to Confluent Platform
Hi all, 

I'm currently working through a PoC of using Kafka Streams for an event sourcing system (based pretty heavily on https://github.com/capitalone/cqrs-manager-for-distributed-reactive-services).  

An example processor from the project (https://github.com/capitalone/cqrs-manager-for-distributed-reactive-services/blob/master/example/src/main/java/com/capitalone/commander/example/kafka_streams/CommandProcessor.java#L90) only makes use of KStreams , so there's no real notion of Create/Update/Delete semantics against a table.  I went down the path of building a processor that would consume a stream of "Commands" (for the sake of this experiment, these commands would be things like "create-user" and "delete-user" with associated payloads that include email address, first name, etc...)

In the snippet below, I split the incoming command stream into two (one representing the stream of create/update commands, one for deletes.) 

KStream<UUID, Map> commands = builder.stream(keySerde, valSerde, "commands-topic");

KStream<UUID, Map> createOrUpdateEvents = commands
.filter(isCreateOrUpdate)
.map((id, command) -> createEventFromCommand("customer-created", id, command));

KStream<UUID, Map> deleteEvents = commands
.filter(isDelete)
.map((id, command) -> createEventFromCommand("customer-deleted", id, command));


KTable<String, Map> createdCustomerTable = createOrUpdateEvents
.map((id, event) -> {
Map customer = (Map) event.get(new Keyword("data"));
String email = (String) customer.get(new Keyword("email"));
return new KeyValue<String, Map>(email, customer);
}).groupBy(
(key, customer) -> key,
stringSerde,
valSerde
).reduce((v1, v2) -> v2, "CreatedCustomers");




Representing created/updated customers as a table is easy enough just by grouping the stream on a unique key (using email for argument sake) 

I'm a little lost as to finding a performant way of using the combinators available on the KTable/KStream interface for filtering out deleted customers from the customer "table".  Building up a hash of emails => deleted users and filtering those out is one approach (but obviously not memory efficient or fault tolerant in the face of a large user base)

Any insights would be greatly appreciated, thanks in advance!




-Gurdas

Matthias J. Sax

unread,
Jan 3, 2017, 4:13:33 PM1/3/17
to confluent...@googlegroups.com
Hi,

A KTable follows Kafka's changelog semantics, ie, a record can be
deleted by a so-called tombstone message with value=null. For example,
do delete a user with ID=5 you can send a message as <5:null>.

Thus, you can simplify you overall approach. You just receive your
command-stream and use a if-then-else construct in your map. If you get
a create-or-update-event you do create the user (to insert or
overwrite), if you get a delete even you create a tombstone.

Furthermore, you can simplify your code, by writing the stream of newly
created users and tombstones into a topic and just read the topic as a
table -- so you do not need all the filtering, mapping, grouping etc.

Something like:

> builder.stream(keySerde, valSerde, "commands-topic")
> .map( (key,event) -> isCreateOrUpdate ? {
> Map customer = (Map) event.get(new Keyword("data"));
> String email = (String) customer.get(new Keyword("email"));
> return new KeyValue<String, Map>(email, customer);
> } : {
> Map customer = (Map) event.get(new Keyword("data"));
> String email = (String) customer.get(new Keyword("email"));
> return new KeyValue<String, Map>(email, null);
> } )
> .to("table-topic");
>
> KTable<String, Map> createdCustomerTable = builder.table("table-topic");





-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/d7c66d7f-508e-424a-8298-d62749f20ba7%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/d7c66d7f-508e-424a-8298-d62749f20ba7%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Gurdas Nijor

unread,
Jan 3, 2017, 11:52:46 PM1/3/17
to Confluent Platform
Much appreciated Matthias, that's a much cleaner approach.


Reply all
Reply to author
Forward
0 new messages