Kafka streams: how to compare current and previous values on stream
1,581 views
Skip to first unread message
Davide Pozza
unread,
Nov 22, 2016, 11:06:26 AM11/22/16
Reply to author
Sign in to reply to author
Forward
Sign in to forward
Delete
You do not have permission to delete messages in this group
Copy link
Report message
Show original message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to Confluent Platform
Hi all
I need to create a stream processor which should publish an alert (message) on an output topic only when the values from the input stream change.
The input stream publishes a message like:
<CLIENT_ID> => <VALUE>
When the <VALUE> for a specific <CLIENT_ID> changes (I mean the current one is different from the previous one), my stream processor should publish on the output topic the CLIENT_ID for which the value changed:
<NEW_KEY> => <CLIENT_ID>
I suppose I should store on a KTable (windowed?) the "previous" values in order to compare them (through a join) with the new ones coming from the input stream, but I'm unable to figure out...
Any suggestions?
Thank you in advance!
Davide
Matthias J. Sax
unread,
Nov 22, 2016, 2:26:01 PM11/22/16
Reply to author
Sign in to reply to author
Forward
Sign in to forward
Delete
You do not have permission to delete messages in this group
Copy link
Report message
Show original message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to confluent...@googlegroups.com
I think, it would be the best to user Processor API for this. Create a
KeyValueStore and assign it to your processor. Within the processor, for
each record you do a lookup in the KeyValue store to compare with the
previous record of the client. If the value did change, emit and update
key-value store.
One thing you need to keep in mind, is out-of-order data. Thus, you
should also check the offset of the currently processed record via the
provided context from init().
You do not have permission to delete messages in this group
Copy link
Report message
Show original message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to Confluent Platform
We may support it in the future at the DSL layer as a windowed-table self-join with shifted windows, but it is not yet supported now so I think the Processor API as Matthias mentioned is the way to go for now.