Kafka Stream processing --Topology - addGlobalStateStore method is not working

242 views
Skip to first unread message

srinivas....@gmail.com

unread,
Aug 9, 2018, 9:27:00 PM8/9/18
to Confluent Platform
Hi All,

requirement: I want process the events from multi partition topic and maintain the state of the previous processed event, so that i can aggregate later.

normal state store is working if we have only one partition on topic, but if we have more than one partition on topic, its not working... not maintaining the already processed event state.

that is why  i tried to use addGlobal state store-- which is not working..

for that i am using kafka Topology addGlobal state store method to maintain the state of the processed events from multiple partitions on Topic. But when i start the program its giving me some times error like "unable to initialize the Topology"  and some times  streaming thread is getting killed without processing and some times nothing happening.


code :

Topology builder=new Topology();

    StoreBuilder<KeyValueStore<String,String>> storeBuilder=Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("Counts"),
            Serdes.String(),
            Serdes.String());


   builder.addGlobalStore(storeBuilder, "Source", new StringDeserializer(), 
    new StringDeserializer(), "input-topic", "Process", new GBDataStreamProcessorSupplier());

PS: simple state store is working for maintaining state of the processed events in case of single or one partition on topic. but if we have multiple partitions on topic, its not maintaining the state.

John Roesler

unread,
Aug 10, 2018, 11:10:06 AM8/10/18
to confluent...@googlegroups.com
Hello!

I didn't get a good mental picture of what you're trying to do and also what you mean by "not working". Do you mind posting the whole topology and some same input, and then describing what you're seeing vs. what you want to see?

Thanks,
-John

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bf9f376b-9be5-4523-86d0-678dbf06f1e4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

srinivas....@gmail.com

unread,
Aug 10, 2018, 12:56:29 PM8/10/18
to Confluent Platform
Hello John,

Thank you so much for giving the reply. i will explain my problem.

I want to process the incoming events from a multi partition topic and maintain the state of the processed events in Key value store, so that i can retrieve the old value from the store and i can update the value with current processing event. basically i want to group the events related to use case and do some operations like json creation.

if i have only one partition one topic, state store is working and maintaining the state of the processed event, 
but if i use topic with multiple partitions ,state store is not working--if i attempt to retrieve the previously stored value using key from store, i am getting NULL.

for solving this i tried using Global State store-- with this i am not able start the program or thread is killing and getting error like topology not initalized.


code:
  public class App {

public static void main(String[] args) {

Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap server");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.EXACTLY_ONCE, 1);
        props.put(StreamsConfig.AT_LEAST_ONCE, 1);

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        
//StreamsBuilder sBuilder=new StreamsBuilder();

        //TopologyBuilder builder=new TopologyBuilder();

        Topology builder=new Topology();        

        //builder.addSource("Source", "ecng-glassbox-input");

        //builder.addProcessor("Process", new GBDataStreamProcessorSupplier(), "Source");
        
        StoreBuilder<KeyValueStore<String,String>> storeBuilder=Stores.keyValueStoreBuilder(
                Stores.inMemoryKeyValueStore("Counts"),
                Serdes.String(),
                Serdes.String());
        
       //builder.addStateStore(storeBuilder,"Process");
        
        builder.addGlobalStore(storeBuilder, "Source", new WallclockTimestampExtractor(),new StringDeserializer(), 
        new StringDeserializer(), "ecng-glassbox-input", "Process", new GBDataStreamProcessorSupplier());
       
        builder.addSink("Sink", "generic-ec-event-out", "Process");




processor:

public class ProcessorSupplier implements ProcessorSupplier<String, String> {

  
public Processor<String, String> get(){
return new Processor<String,String>() {
  private ProcessorContext context;
  private KeyValueStore<String, String> kvStore;


  @SuppressWarnings("unchecked")
  public void init(final ProcessorContext context) {
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore<String,String>) context.getStateStore("Counts");

      // schedule a punctuate() method every 1000 milliseconds based on stream-time
      this.context.schedule(1000, PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long arg0) {
KeyValueIterator<String, String> iter = kvStore.all();
          while (iter.hasNext()) {
-
              KeyValue<String, String> entry = iter.next();

              String value=entry.value;
              if(value!=null && value!=""&& value.contains("_complete")) {

              System.out.println("forwadrd kv-->"+entry.key+"  "+entry.value);
              //call the json creation
              context.forward(entry.key, entry.value);
              }
          }
          iter.close();

    
}
}); 
         
  }
               



  public void process(String dummy,String line) {


                    String[] words = line.toLowerCase(Locale.getDefault()).split(" ");

                        Integer oldValue = this.kvStore.get(word[0]);

                        if (oldValue == null) {

                          this.kvStore.put(word, line);

                        } else {

                            this.kvStore.put(word, oldValue + line);

                            
                        }
                    }


                    context.commit();
                }






On Friday, August 10, 2018 at 10:10:06 AM UTC-5, John Roesler wrote:
Hello!

I didn't get a good mental picture of what you're trying to do and also what you mean by "not working". Do you mind posting the whole topology and some same input, and then describing what you're seeing vs. what you want to see?

Thanks,
-John

On Thu, Aug 9, 2018 at 8:27 PM <srinivas....@gmail.com> wrote:
Hi All,

requirement: I want process the events from multi partition topic and maintain the state of the previous processed event, so that i can aggregate later.

normal state store is working if we have only one partition on topic, but if we have more than one partition on topic, its not working... not maintaining the already processed event state.

that is why  i tried to use addGlobal state store-- which is not working..

for that i am using kafka Topology addGlobal state store method to maintain the state of the processed events from multiple partitions on Topic. But when i start the program its giving me some times error like "unable to initialize the Topology"  and some times  streaming thread is getting killed without processing and some times nothing happening.


code :

Topology builder=new Topology();

    StoreBuilder<KeyValueStore<String,String>> storeBuilder=Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore("Counts"),
            Serdes.String(),
            Serdes.String());


   builder.addGlobalStore(storeBuilder, "Source", new StringDeserializer(), 
    new StringDeserializer(), "input-topic", "Process", new GBDataStreamProcessorSupplier());

PS: simple state store is working for maintaining state of the processed events in case of single or one partition on topic. but if we have multiple partitions on topic, its not maintaining the state.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

John Roesler

unread,
Aug 10, 2018, 1:44:22 PM8/10/18
to confluent...@googlegroups.com
I see. I think that the global store is not what you want for this use case (and I don't think it can be used like that anyway).

I think the problem you were getting with nulls would be solved if you repartitioned the stream by extracting the key from the value explicitly. Kafka Streams will take care to send all the records with the same keys to the same instance, but in your case, you're discarding the key and instead extracting a store key from the stream's value (word[0]). This means that Kafka Streams can't guarantee the right records will go to the right instances.

I think this should work for you:
new StreamsBuilder()
.stream("ecng-glassbox-input", Consumed.with(Serdes.String(), Serdes.String()))
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
// select the first word as the new key, leaving the value the same.
return words[0];
}
})
.reduce(
new Reducer<String>() {
@Override
public String apply(final String oldValue, final String line) {
// combine subsequent values by concatenating them to the prior value
return oldValue + line;
}
},
Materialized.with(Serdes.String(), Serdes.String()))
.toStream()
.filter(new Predicate<String, String>() {
@Override
public boolean test(final String key, final String value) {
// only forward values that contain "_complete"
return value != null && !value.equals("") && value.contains("_complete");
}
})
.to("generic-ec-event-out", Produced.with(Serdes.String(), Serdes.String()));
Does that seem right?

If you really need your custom processor, you could instead attach it after selecting a new key like this:

new StreamsBuilder()
.stream("ecng-glassbox-input", Consumed.with(Serdes.String(), Serdes.String()))
.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(final String key, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
// select the first word as the new key, leaving the value the same.
return words[0];
}
})
.process(new ProcessorSupplier<String, String>() {
@Override
        public Processor<String, String> get() {
return new Processor<String, String>() {
                @Override
                public void init(final ProcessorContext context) {

}

                @Override
public void process(final String key, final String value) {

}

@Override
public void close() {

}
};
}
}, "Counts");

Does that help?
-John

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

srinivas....@gmail.com

unread,
Aug 22, 2018, 8:58:10 AM8/22/18
to Confluent Platform
Hi John,

Thanks for the reply. Its working for us.appreciate your help.

one more problem i am seeing after running the streaming app.

its not picking the live record related to my use case from kafka topic.

we have few UAT env's, which are publishing the data on input topic, our UAT env is one among them.I want my stream app to read the live the record related to my use case as and when i do use case on my UAT3 env.

below are my stream config properties


       props.put(StreamsConfig.APPLICATION_ID_CONFIG, props.getProperty("application.id"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.EXACTLY_ONCE, 1);
        props.put(StreamsConfig.AT_LEAST_ONCE, 1);

        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        

am i missing any thing with respect steam config props.please let me know.


Thanks,

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages