Use kafka-stream to store last event in a topic?

393 views
Skip to first unread message

Varun Saini

unread,
Aug 8, 2016, 3:23:10 PM8/8/16
to Confluent Platform
Hi,

I want to explore kakfa-stream to store last event in given kafka topic and then have an http endpoint to get that event. Is this possible with kafka-stream? Can I store the last event in local store provide by kafka-stream?

Thanks,
Varun

Varun Saini

unread,
Aug 8, 2016, 4:22:15 PM8/8/16
to Confluent Platform
I want to verify if I can compare two streams (from two topics with same data) and alert if there is any mismatch. In some cases streams might have to catch up to each other, but idea is to compare only the latest from both the streams.

Matthias J. Sax

unread,
Aug 10, 2016, 4:41:20 AM8/10/16
to confluent...@googlegroups.com
If I understand your question correctly, right now this is not possible
out-of-the-box.

However, Kafka Streams is getting a new feature (queryable state:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams)
that allows to query local stores from other applications. This would
allow you to get the latest value for a key for a changelog topic.

Parts of this new feature are already merged into trunk. So you can try
it out already. Queryable State feature should be available in 0.10.1.0
(no guarantee).

To use the feature, you would simply read your topic as a KTable:

> builder.table("myTopicName", "storeName");

Afterwards starting you Streams application you can do

> KafkaStreams.getStore("storeName", ...)

to get access to the (local) store. See this integration test for more
details:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/20edeaa5-a748-4ad6-b685-26fc85ee685b%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/20edeaa5-a748-4ad6-b685-26fc85ee685b%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Varun Saini

unread,
Aug 10, 2016, 4:38:02 PM8/10/16
to Confluent Platform
Thanks Matthias. That's really helpful.

Michael Noll

unread,
Aug 11, 2016, 3:10:01 AM8/11/16
to confluent...@googlegroups.com
Varun,

another possible option which is available to you right now (i.e. in Kafka 0.10.0.0 and CP 3.0.0) is to use the Processor API [1], as briefly discussed with you offline.

With the Processor API you can write / read any values you want in your state stores, and in your use case you would most probably just need to have at most two such stores, one from each stream (for example).

Hope this helps,
Michael






--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ee0e4d91-f363-cffe-6001-4028c2d60d3c%40confluent.io.

For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Varun Saini

unread,
Aug 11, 2016, 6:28:12 PM8/11/16
to Confluent Platform
Thanks again Michael.

So I have simplified my requirements a little bit. My requirement is more on the line of comparing two same streams from two different topics. Say we have a two topics called "history1" and "history2" with same type of data. I want to compare the records(avro) in both the streams.If they are same do nothing, but if not then write the record to a different topic in kafka.

Do you think I can do that with DSL, if yes, can you please point me in a direction. Or I need to use Processor API and then use local store to store record from both streams and compare them. Can I compare data from two different stream stores?

Thanks,
Varun
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Aug 12, 2016, 4:13:13 AM8/12/16
to confluent...@googlegroups.com
Varun,

it depends a bit on how exactly you (want to) make the decision on whether the streams/topics are the same.

For example, if you want to compare topic partitions offset-by-offset (which e.g. would allow you to validate whether not only the payload of messages are the same but also whether the order of messages within the same "pair" of topic partitions are identical), then the Processor API is preferable because it's more flexible to implement such logic.

You might even consider using Kafka's normal consumer client to implement such comparison logic.  Here, you could consume from both topics and, whenever a new pair of messages arrive in the topic partitions you are comparing (e.g. a new message in P1 in topic A at offset 56 plus a new message in P1 in topic B at offset 56; and you're expecting both messages at offsets 56 to be identical), run your comparison.

The Processor API / Kafka's normal consumer client might be more applicable to such low-level comparisons (e.g. offset-by-offset, if that's what you want to do) because the DSL works more on the level of keys/values/fields in the payload rather than in bits and bytes of the underlying Kafka storage layer.

Does that make sense?



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Varun Saini

unread,
Aug 12, 2016, 9:13:22 AM8/12/16
to Confluent Platform
It totally make sense. Thanks for helping me with this Michael.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Aug 12, 2016, 1:35:37 PM8/12/16
to confluent...@googlegroups.com
Sure thing, Varun, my pleasure!

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages