I am using kafka streams with multithreading enabled with this flag NUM_STREAM_THREADS_CONFIG=3 and i am using state store also.
But state store acting as local to the thread but i want state store to be global because i want every thread need to update and read the entire state.
public class App
{
public static void main( String[] args )
{
StateStoreSupplier testStore = Stores.create("count2")
.withStringKeys()
.withLongValues()
.persistent()
.build();
final KStreamBuilder builder = new KStreamBuilder();
builder.addSource("source", "test2").addProcessor("process", TestProcessor::new, "source").addStateStore(testStore, "process");
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put("auto.offset.reset", "latest");
props.put("num.stream.threads", 3);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
public static class TestProcessor implements Processor<String, String> {
private KeyValueStore<String, Long> kvStore;
private ReadOnlyKeyValueStore<String, Long> kvStore1;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
System.out.println("Initialized");
this.kvStore = (KeyValueStore<String, Long>)
context.getStateStore("count2");
}
@Override
public void process(String k, String v) {
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
}