I actually am providing the store name in the test driver constructor. Here is my very simple topology, just a single KTable reading from a topic and outputting to another topic. Can you try it and see if you get the same thing?
package test;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
public class ProcessorDriverTest2 {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("ProcessorDriverTest2");
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);
// topology
KStreamBuilder kstreamBuilder = new KStreamBuilder();
StringSerde stringSerde = new StringSerde();
KTable<String, String> table = kstreamBuilder.table(stringSerde,
stringSerde, "alertInputTopic", "alertStore");
// create test driver
ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
streamsConfig,
kstreamBuilder,
"alertStore");
StringSerializer serializer = new StringSerializer();
StringDeserializer deserializer = new StringDeserializer();
// send data to input topic
testDriver.process("alertInputTopic",
"the Key", "the Value", serializer, serializer);
// read data from output topic
ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic",
deserializer, deserializer);
System.out.println("rec: " + rec);
}
}
Thanks,
Byron