Kafka streams: how to compare current and previous values on stream

1,581 views
Skip to first unread message

Davide Pozza

unread,
Nov 22, 2016, 11:06:26 AM11/22/16
to Confluent Platform
Hi all

I need to create a stream processor which should publish an alert (message) on an output topic only when the values from the input stream change.
The input stream publishes a message like:

<CLIENT_ID> => <VALUE>

When the <VALUE> for a specific <CLIENT_ID> changes (I mean the current one is different from the previous one), my stream processor should publish on the output topic the CLIENT_ID for which the value changed:

<NEW_KEY> => <CLIENT_ID>

I suppose I should store on a KTable (windowed?) the "previous" values in order to compare them (through a join) with the new ones coming from the input stream, but I'm unable to figure out...
Any suggestions?

Thank you in advance!

Davide

Matthias J. Sax

unread,
Nov 22, 2016, 2:26:01 PM11/22/16
to confluent...@googlegroups.com
I think, it would be the best to user Processor API for this. Create a
KeyValueStore and assign it to your processor. Within the processor, for
each record you do a lookup in the KeyValue store to compare with the
previous record of the client. If the value did change, emit and update
key-value store.

One thing you need to keep in mind, is out-of-order data. Thus, you
should also check the offset of the currently processed record via the
provided context from init().


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/8019f389-df22-47e6-8084-80f0fa60b573%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/8019f389-df22-47e6-8084-80f0fa60b573%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Guozhang Wang

unread,
Nov 22, 2016, 4:47:29 PM11/22/16
to Confluent Platform
We may support it in the future at the DSL layer as a windowed-table self-join with shifted windows, but it is not yet supported now so I think the Processor API as Matthias mentioned is the way to go for now.


Guozhang

Davide Pozza

unread,
Nov 23, 2016, 8:04:20 AM11/23/16
to Confluent Platform
Thank you!

Maciej Lopacinski

unread,
Aug 8, 2018, 6:23:08 PM8/8/18
to Confluent Platform
Hi guys,

My use case is similar, I was wondering if since 2016 there was any change in the API that makes the mentioned implementation possible.

@Davide - did you have a chance to implement the KeyValueStore solution? 
Reply all
Reply to author
Forward
0 new messages