Unit Test a KStreams Application which has a state store

579 views
Skip to first unread message

Arun Vasudevan

unread,
Nov 14, 2017, 7:21:09 PM11/14/17
to Confluent Platform
Hi,
   I want to unit test a kstreams application which has a custom processor which writes to a state store.

In my unit test application i write to the input-topic of the application but then do i also need to add the state store in my test application as well and include the custom processor?

In that case my unit test application will also be having all the implementations the original class right?

My Class:

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.StateStoreProvider;


import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class PipeWordCount {
   
public static void main(String[] args) throws Exception {
       
Properties props = new Properties();
        props
.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-wordcount");
        props
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props
.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props
.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

       
Topology topology = new Topology();

       
KeyValueBytesStoreSupplier countStoreSupplier = Stores.persistentKeyValueStore("Counts");

       
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(countStoreSupplier, Serdes.String(), Serdes.Long());

        topology
.addSource("source", "stream-plaintext-input")
               
.addProcessor("processor", () -> new MyProcessor(), "source")
               
.addStateStore(storeBuilder, "processor")
               
.addSink("sink","sink-topic", "processor");

       
System.out.println(topology.describe());

       
final KafkaStreams streams = new KafkaStreams(topology, props);
       
final CountDownLatch latch = new CountDownLatch(1);

       
// attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
           
@Override
            public void run() {
               
streams.close();
               
latch.countDown();
           
}
       
});

       
try {
            streams
.start();
           
System.out.println("KStream Started.....");
            latch
.await();
       
} catch (Throwable throwable) {
           
System.exit(1);
       
}
       
System.exit(0);
   
}
}


Any tips would be great to test the above state store application.

Regards,
Arun

Guozhang Wang

unread,
Nov 15, 2017, 2:08:47 PM11/15/17
to Confluent Platform
That depends on what you are trying to test.

If you're only testing the computational logic of "MyProcessor" you could mock the state store by using an in-memory store, for example.

If you're also testing the store put / get behavior then you'd need to have whatever you have in the original class.


Guozhang

Arun Vasudevan

unread,
Nov 16, 2017, 3:40:28 PM11/16/17
to Confluent Platform
Thanks Guozhang!
Reply all
Reply to author
Forward
0 new messages