Topology builder=new Topology();
StoreBuilder<KeyValueStore<String,String>> storeBuilder=Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("Counts"),
Serdes.String(),
Serdes.String());
builder.addGlobalStore(storeBuilder, "Source", new StringDeserializer(),
new StringDeserializer(), "input-topic", "Process", new GBDataStreamProcessorSupplier());
PS: simple state store is working for maintaining state of the processed events in case of single or one partition on topic. but if we have multiple partitions on topic, its not maintaining the state.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bf9f376b-9be5-4523-86d0-678dbf06f1e4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Hello!I didn't get a good mental picture of what you're trying to do and also what you mean by "not working". Do you mind posting the whole topology and some same input, and then describing what you're seeing vs. what you want to see?Thanks,-John
On Thu, Aug 9, 2018 at 8:27 PM <srinivas....@gmail.com> wrote:
Hi All,--requirement: I want process the events from multi partition topic and maintain the state of the previous processed event, so that i can aggregate later.normal state store is working if we have only one partition on topic, but if we have more than one partition on topic, its not working... not maintaining the already processed event state.that is why i tried to use addGlobal state store-- which is not working..for that i am using kafka Topology addGlobal state store method to maintain the state of the processed events from multiple partitions on Topic. But when i start the program its giving me some times error like "unable to initialize the Topology" and some times streaming thread is getting killed without processing and some times nothing happening.code :Topology builder=new Topology(); StoreBuilder<KeyValueStore<String,String>> storeBuilder=Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore("Counts"), Serdes.String(), Serdes.String()); builder.addGlobalStore(storeBuilder, "Source", new StringDeserializer(), new StringDeserializer(), "input-topic", "Process", new GBDataStreamProcessorSupplier());PS: simple state store is working for maintaining state of the processed events in case of single or one partition on topic. but if we have multiple partitions on topic, its not maintaining the state.
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
new StreamsBuilder()
.stream("ecng-glassbox-input", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
// select the first word as the new key, leaving the value the same.
return words[0];
}
})
.reduce(
new Reducer<String>() {
@Override
public String apply(final String oldValue, final String line) {
// combine subsequent values by concatenating them to the prior value
return oldValue + line;
}
},
Materialized.with(Serdes.String(), Serdes.String()))
.toStream()
.filter(new Predicate<String, String>() {
@Override
public boolean test(final String key, final String value) {
// only forward values that contain "_complete"
return value != null && !value.equals("") && value.contains("_complete");
}
})
.to("generic-ec-event-out", Produced.with(Serdes.String(), Serdes.String()));
new StreamsBuilder()
.stream("ecng-glassbox-input", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
// select the first word as the new key, leaving the value the same.
return words[0];
}
})
.process(new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
@Override
public void init(final ProcessorContext context) {
}
@Override
public void process(final String key, final String value) {
}
@Override
public void close() {
}
};
}
}, "Counts");
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bf9f376b-9be5-4523-86d0-678dbf06f1e4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/587a01ef-6479-4131-b85b-e0c56e48eb7f%40googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bf9f376b-9be5-4523-86d0-678dbf06f1e4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.