Kafka streams with global state store for NUM_STREAM_THREADS_CONFIG=3

2,795 views
Skip to first unread message

Ranjit Kumar

unread,
Nov 13, 2017, 5:50:00 AM11/13/17
to Confluent Platform
Hi All,

I am using kafka streams with multithreading enabled with this flag NUM_STREAM_THREADS_CONFIG=3 and i am using state store also.
 

But state store acting as local to the thread but i want state store to be global because i want every thread  need to update and read the entire  state.
Please find the code which i wrote:

public class App
{
    public static void main( String[] args )
    {

        StateStoreSupplier testStore = Stores.create("count2")
                .withStringKeys()
                .withLongValues()
                .persistent()
                .build();


        final KStreamBuilder builder = new KStreamBuilder();

        builder.addSource("source", "test2").addProcessor("process", TestProcessor::new, "source").addStateStore(testStore, "process");

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());

        props.put("auto.offset.reset", "latest");
        props.put("num.stream.threads", 3);

          KafkaStreams streams = new KafkaStreams(builder, props);
      
        streams.start();
    }


    public static class TestProcessor implements Processor<String, String> {
         private  KeyValueStore<String, Long> kvStore;
         private  ReadOnlyKeyValueStore<String, Long> kvStore1;
            private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
                this.context = context;

            System.out.println("Initialized");
                this.kvStore = (KeyValueStore<String, Long>)
                                    context.getStateStore("count2");

        }

        @Override
       public void process(String k, String v) {
 
        }

        @Override
        public void punctuate(long timestamp) {

        }

        @Override
        public void close() {

        }
    }
}

Regards,
Ranjit


Matthias J. Sax

unread,
Nov 13, 2017, 1:56:56 PM11/13/17
to confluent...@googlegroups.com
If you want to share state across thread, you need to add the store as a
"global store" via .addGlobalStore().

This make the store available to all processors (no need to connect a
global store to a processor).


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/9c10d7af-d817-410f-8f02-37edf0edc031%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/9c10d7af-d817-410f-8f02-37edf0edc031%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Ranjit Kumar

unread,
Nov 13, 2017, 8:16:43 PM11/13/17
to Confluent Platform
Thanks Matthias ..
In all stream threads i am planning to do update and get.. Do i need to rake care any synchronization ?

Regards,
Ranjit

Ranjit Kumar

unread,
Nov 14, 2017, 8:01:07 AM11/14/17
to Confluent Platform
Hi Matthias,

Can you please share some sample code also if you have.

Thanks & Regards,
Ranjit

Damian Guy

unread,
Nov 14, 2017, 8:43:07 AM11/14/17
to confluent...@googlegroups.com
If you add a GlobalStateStore you won't be able to update it from all threads. It is only updated by a single thread, i.e., the GlobalStreamThread. You can read the data from other threads.

As has already been mentioned in another thread you started, if you want all threads to be able to update the state store you will need to create your own StateStoreSupplier/StoreBuilder that when `get()` is called always returns the same instance of the state store. But if you do this you will also need to take care to synchronize access to the store.
 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b238f58e-c474-45f2-a3a3-d2844ce5f287%40googlegroups.com.

Ranjit Kumar

unread,
Nov 14, 2017, 11:22:58 AM11/14/17
to Confluent Platform
Hi Damian,

Thanks for your response .

Which one is holds good to use addGlobalStore (or) globalTable ?

My requirement is like i will get the topic in proto buf format, i need to parse that and based on key and value i have to store in in memory DB.(insert fallowed by update will be their)
and i need to distribute this stored data to as topic to outside and want to for perf boost want to use stream threads.

please suggest me.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Matthias J. Sax

unread,
Nov 14, 2017, 12:54:05 PM11/14/17
to confluent...@googlegroups.com
Ranjit,

I just realized, that you want to update the state from your processor
-- this is actually not supported by a global state (at least not directly).

Global state is populated from a topic at startup, and the global thread
should be the only thread that updates the state: even if it is
technically possible to write to the global state from any processor,
those updates won't be reflected in the underlying topic and thus might
get lost. Therefore, if you want to update the state, you will need to
directly write to the topic that feeds the state. And than the global
thread will pick those changes up and update the state.

Concurrent access is not save on writes for this reason (we assume
single writer) and you should only read from the global state from your
"regular" processors.


-Matthias
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/9c10d7af-d817-410f-8f02-37edf0edc031%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/5701f69b-81e0-4fd9-855b-8aced79c0df8%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/5701f69b-81e0-4fd9-855b-8aced79c0df8%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Ranjit Kumar

unread,
Nov 16, 2017, 4:21:05 AM11/16/17
to Confluent Platform
Thanks a lot for the response.

Can you please share some StateStoreSupplier/StoreBuilder  example, if possible please share me singleton implementation to take the same object in all threads.

Is it possible to lock the row level entry in Rocks DB, while taking care the synchronization .

Thanks & Regards,
Ranjit

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages