ProcessorTopologyTestDriver not working with KTable

203 views
Skip to first unread message

Byron N

unread,
Nov 1, 2016, 8:00:45 PM11/1/16
to Confluent Platform
Hello,

I am using the ProcessorTopologyTestDriver to write some unit tests to test out our topologies.  Everything works fine with KStreams but as soon as I try to add a KTable to my test topology, I get an error (see below).  It seems that it's trying to use RocksDB, which defeats the point of using this test driver I would think.   Looking at the KTableImpl.java when it materializes the table it creates a RocksDBKeyValueStoreSupplier.   Is there a way to make the driver use a different store i.e., make it pluggable?

Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: test
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
        at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)

Thanks,
Byron

Byron N

unread,
Nov 1, 2016, 8:38:54 PM11/1/16
to Confluent Platform
Sorry, should have mentioned I am using the 0.10.1.0 release.

Damian Guy

unread,
Nov 4, 2016, 1:28:15 PM11/4/16
to Confluent Platform
Hi,

There is currently no way to use a different store. However i think you can fix your current issue by constructing the ProcessorTopologoyTestDriver providing the store names, i.e.,

new ProcessorTopologyTestDriver(config, builder, storeName1, storeName2);

Thanks,
Damian


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/0ea51bef-9760-48eb-bd6c-fc4882d9ea7b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Byron N

unread,
Nov 4, 2016, 4:10:57 PM11/4/16
to Confluent Platform
Hi,

I actually am providing the store name in the test driver constructor.   Here is my very simple topology, just a single KTable reading from a topic and outputting to another topic.  Can you try it and see if you get the same thing?

package test;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.ProcessorTopologyTestDriver;

public class ProcessorDriverTest2 {
public static void main(String[] args) throws IOException, InterruptedException {

System.out.println("ProcessorDriverTest2");
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
StreamsConfig streamsConfig = new StreamsConfig(props);
// topology
KStreamBuilder kstreamBuilder = new KStreamBuilder();
StringSerde stringSerde = new StringSerde();
KTable<String, String> table = kstreamBuilder.table(stringSerde,
stringSerde, "alertInputTopic", "alertStore");
table.to("alertOutputTopic");
// create test driver
ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
streamsConfig, 
kstreamBuilder, 
"alertStore");

StringSerializer serializer = new StringSerializer();
StringDeserializer deserializer = new StringDeserializer();

// send data to input topic
testDriver.process("alertInputTopic", 
"the Key", "the Value", serializer, serializer);
// read data from output topic
ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic", 
deserializer, deserializer);
System.out.println("rec: " + rec);
}
}

Thanks,
Byron

On Friday, November 4, 2016 at 1:28:15 PM UTC-4, Damian Guy wrote:
Hi,

There is currently no way to use a different store. However i think you can fix your current issue by constructing the ProcessorTopologoyTestDriver providing the store names, i.e.,

new ProcessorTopologyTestDriver(config, builder, storeName1, storeName2);

Thanks,
Damian


On Wed, 2 Nov 2016 at 00:38 Byron N <byro...@gmail.com> wrote:
Sorry, should have mentioned I am using the 0.10.1.0 release.


On Tuesday, November 1, 2016 at 8:00:45 PM UTC-4, Byron N wrote:
Hello,

I am using the ProcessorTopologyTestDriver to write some unit tests to test out our topologies.  Everything works fine with KStreams but as soon as I try to add a KTable to my test topology, I get an error (see below).  It seems that it's trying to use RocksDB, which defeats the point of using this test driver I would think.   Looking at the KTableImpl.java when it materializes the table it creates a RocksDBKeyValueStoreSupplier.   Is there a way to make the driver use a different store i.e., make it pluggable?

Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: test
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
        at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)

Thanks,
Byron

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Marek Svitok

unread,
Nov 7, 2016, 3:24:06 AM11/7/16
to Confluent Platform
Hi Byron.

The problem is that you are reading from input topic and it doesn't exist while the test is running. This is what the message you're getting says.
Try calling test drivers process() method like this:

ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig(), builder);
driver
.process(TOPIC, "key", "value"),
       
keySerializer,valueSerializer);

ProducerRecord aRecord = driver.readOutput("topicOut",
       
Serdes.String().deserializer(),       
        Serdes.String().deserializer()
)

It works for my tests.
Marek.

Dňa piatok, 4. novembra 2016 21:10:57 UTC+1 Byron N napísal(-a):

Byron N

unread,
Nov 8, 2016, 1:38:40 PM11/8/16
to Confluent Platform
Hi Marek,

For me it fails before it even gets to the process(), it fails in the ProcessorTopologyTestDriver constructor.  But your post gave me an idea so I tested with Kafka 0.10.0.1 and my test works fine also (with one minor change to add key/value serializers to table.to()).   So the issue is the test does not work with Kafka 0.10.1.0.   I will post a new topic with that specific title.  Thanks for answering.

Byron
Reply all
Reply to author
Forward
0 new messages