I am trying to use Kafka StateStore as following:
val store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("bary-path"), Serdes.String(), Serdes.String())
val builder: StreamsBuilder = new StreamsBuilder
store.build()
builder.addStateStore(store)
// Handle as ktable
val sourceKtable: KTable[String, String] = builder.table("bary-path")
ktable(sourceKtable)
builder.build()The function ktable:
def ktable(source: KTable[String, String]): Unit = {
source
.toStream
.print(Printed.toSysOut[String, String])
}I do received the stream:
[KTABLE-TOSTREAM-0000000003]: FOLDERS, {"folders": ["C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker"]}
[KTABLE-TOSTREAM-0000000003]: FOLDERS, {"folders": ["C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker"]}
[KTABLE-TOSTREAM-0000000003]: FOLDERS, {"folders": ["C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker", "C:\\Windows\\DigitalLocker"]}But it does not saved into the store.
As you can see, no table in KSQL console.
What am I doing wrong?
Thanks
void process(ProcessorSupplier<? super K,? super V> processorSupplier, java.lang.String... stateStoreNames)
val sourceStream: KStream[String, String] = builder.stream("bary-path")
stream(sourceStream)
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/257a6b4d-eaa0-4455-83d3-fc6aeab92d7c%40googlegroups.com.
StoreBuilder pathsStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("BARY-PATH-STORE"), Serdes.String(), Serdes.String());
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> soureTable = builder
.table("BARY-PATH", Materialized.as("BARY-PATH-STORE"));
return builder.build();--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/10a5f081-c5ec-4384-97e9-2c58cb35898d%40googlegroups.com.
StoreBuilder pathsStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("BARY-PATH-STORE"), Serdes.String(), Serdes.String());
KTable<String, String> soureTable = builder
.addStateStore(pathsStore)
.table("BARY-PATH");
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/60f9cea3-cb7b-4972-b29e-1398cd237945%40googlegroups.com.
I am reading the concept and trying to understand, what is the difference between stream and stream processor.It says:
- A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
- A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
For me, I can not see any difference, except stream processor is a node and streams are edges?Thanks
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7ca83790-1a89-4b69-b0db-ac0cccafbbde%40googlegroups.com.