How to use Kafka StateStore?

522 views
Skip to first unread message

Eric Koston

unread,
Nov 20, 2017, 10:03:55 AM11/20/17
to Confluent Platform
Hi all    

I am trying to use Kafka StateStore as following:

    val store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("bary-path"), Serdes.String(), Serdes.String())
    val builder: StreamsBuilder = new StreamsBuilder
    store.build()
    builder.addStateStore(store)

    // Handle as ktable
    val sourceKtable: KTable[String, String] = builder.table("bary-path")
    ktable(sourceKtable)

    builder.build()

The function ktable:

  def ktable(source: KTable[String, String]): Unit = {
    source
      .toStream
      .print(Printed.toSysOut[String, String])
  }

I do received the stream:

[KTABLE-TOSTREAM-0000000003]: FOLDERS, {"folders": ["C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker"]}
[KTABLE-TOSTREAM-0000000003]: FOLDERS, {"folders": ["C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker"]}
[KTABLE-TOSTREAM-0000000003]: FOLDERS, {"folders": ["C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker"]}

But it does not saved into the store.

enter image description here

As you can see, no table in KSQL console.

What am I doing wrong?


Thanks

Matthias J. Sax

unread,
Nov 20, 2017, 2:39:28 PM11/20/17
to confluent...@googlegroups.com
The content of a KTable is maintained by Streams API and you cannot
"put" data directly into it.

If you read a KTable from a topic, you update the table by writing to
the topic and Kafka Streams will update the table by reading from the
topic. A KTable this is read from a topic is designed to contain the
content of the topic.

Of course, a KTable can also be the result of an aggregation. For this
case, Streams API updates the KTable with the values your aggregation
UDF return.



About using stores, check out this SO question:
https://stackoverflow.com/questions/47178224/how-to-use-kafka-streams-dsl-v1-0-transform-with-state



Last: if you write a Kafka Streams applications, it will be completely
decoupled from KSQL. It's two independent things.

Of course, if you read a KTable from a topic, or write a result KTable
back to a topic, you can use this topic within KSQL by creating a TABLE
from the topic to use it with KSQL, for example:

> CREATE TABLE users (usertimestamp BIGINT, user_id VARCHAR, gender VARCHAR, region_id VARCHAR)
> WITH (VALUE_FORMAT = 'JSON',
> KAFKA_TOPIC = 'my-users-topic');

(from
https://github.com/confluentinc/ksql/blob/0.1.x/docs/syntax-reference.md#syntax-reference)


Hope this helps.


-Matthias

On 11/20/17 7:03 AM, Eric Koston wrote:
> Hi all    
>
> I am trying to use Kafka |StateStore| as following:
>
> |valstore
> =Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("bary-path"),Serdes.String(),Serdes.String())valbuilder:StreamsBuilder=newStreamsBuilderstore.build()builder.addStateStore(store)//
> Handle as
> ktablevalsourceKtable:KTable[String,String]=builder.table("bary-path")ktable(sourceKtable)builder.build()|
>
> The function |ktable|:
>
> |defktable(source:KTable[String,String]):Unit={source .toStream
> .print(Printed.toSysOut[String,String])}|
>
> I do received the stream:
>
> |[KTABLE-TOSTREAM-0000000003]:FOLDERS,{"folders":["C:\\Windows\\DigitalLocker","C:\\Windows\\DigitalLocker","C:\\Windows\\DigitalLocker"]}[KTABLE-TOSTREAM-0000000003]:FOLDERS,{"folders":["C:\\Windows\\DigitalLocker","C:\\Windows\\DigitalLocker","C:\\Windows\\DigitalLocker"]}[KTABLE-TOSTREAM-0000000003]:FOLDERS,{"folders":["C:\\Windows\\DigitalLocker","C:\\Windows\\DigitalLocker","C:\\Windows\\DigitalLocker"]}|
>
> But it does not saved into the store.
>
> enter image description here <https://i.stack.imgur.com/YRCqb.png>
>
> As you can see, no table in |KSQL| console.
>
> What am I doing wrong?
>
>
> Thanks
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/5d00b893-3362-4c1f-bc6a-2ab475564615%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/5d00b893-3362-4c1f-bc6a-2ab475564615%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Message has been deleted

Eric Koston

unread,
Nov 21, 2017, 2:39:54 AM11/21/17
to Confluent Platform
First of all, thanks so much for your answer.  
My problem is, I do not know, how to use the StateStore to save data into the store.  
Which method on KStream do I have to call:

void process(ProcessorSupplier<? super K,? super V> processorSupplier,
             java.lang.String... stateStoreNames)


?

I have a KStream, that looks as following:
val sourceStream: KStream[String, String] = builder.stream("bary-path")
stream
(sourceStream)

How to write data from KStream into StateStore?  

Thanks






Damian Guy

unread,
Nov 21, 2017, 5:51:22 AM11/21/17
to confluent...@googlegroups.com
Hi,

As Matthias said, to get data into the store you write to the topic. So if you had something like this:

builder.table("my-topic-name",  Materialized.as("my-store-name"));

You would write data to "my-topic-name" and the store "my-store-name" will be updated.

Thanks
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/257a6b4d-eaa0-4455-83d3-fc6aeab92d7c%40googlegroups.com.
Message has been deleted

Eric Koston

unread,
Nov 21, 2017, 8:31:59 AM11/21/17
to Confluent Platform
Thanks.
I rewrite my code into Java:

    StoreBuilder pathsStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("BARY-PATH-STORE"), Serdes.String(), Serdes.String());
   
StreamsBuilder builder = new StreamsBuilder();




   
KTable<String, String> soureTable = builder
       
.table("BARY-PATH", Materialized.as("BARY-PATH-STORE"));


   
return builder.build();

How can I check, if the data gets stored? With KSQL?

Damian Guy

unread,
Nov 21, 2017, 8:38:55 AM11/21/17
to confluent...@googlegroups.com
You don't need to create the  StoreBuilder at the top - this will get automatically done for you when you do builder.table(...).

Streams and KSQL are two separate things. If you want use KSQL you could do the same thing by using the CREATE TABLE statement. I suggest you look at the documenation https://github.com/confluentinc/ksql

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Eric Koston

unread,
Nov 21, 2017, 8:48:04 AM11/21/17
to Confluent Platform
How can I look at the stored data?   

When I would write like:  
StoreBuilder pathsStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("BARY-PATH-STORE"), Serdes.String(), Serdes.String());
KTable<String, String> soureTable =
builder
 
.addStateStore(pathsStore)
 
.table("BARY-PATH");

Would be right too?  

Thanks

Damian Guy

unread,
Nov 21, 2017, 8:57:49 AM11/21/17
to confluent...@googlegroups.com

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Message has been deleted

Eric Koston

unread,
Nov 21, 2017, 9:47:21 AM11/21/17
to Confluent Platform
Thanks I am going to read attentionally.

Michael Noll

unread,
Nov 29, 2017, 12:33:09 PM11/29/17
to confluent...@googlegroups.com
> For me, I can not see any difference, except stream processor is a node and streams are edges?

Exactly, and that's a big difference. :-)

Stream = the data
Stream processor = what's processing the data



On Tue, Nov 21, 2017 at 3:13 PM, Eric Koston <zerocod...@gmail.com> wrote:
I am reading the concept and trying to understand, what is the difference between stream and stream processor.

It says:  

  • stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
  • stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
For me, I can not see any difference, except stream processor is a node and streams are edges?  

Thanks

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7ca83790-1a89-4b69-b0db-ac0cccafbbde%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages