KTable unable fetch data from Materialized view

288 views
Skip to first unread message

jpava...@gmail.com

unread,
Apr 26, 2018, 9:03:27 AM4/26/18
to Confluent Platform

I am using Kafka Streams with Spring Boot. In my use case when I receive customer event I need to store it in customer-store materialized view and when I receive order event, I need to join customer and order then store the result in customer-order materialized view.

StoreBuilder customerStateStore = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("customer-store"),Serdes.String(), customerSerde)
                .withLoggingEnabled(new HashMap<>());
streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde)).to("customer-to-ktable-topic",Produced.with(Serdes.String(), customerSerde));
KTable<String, Customer> customerKTable = streamsBuilder.table("customer-to-ktable-topic", Consumed.with(Serdes.String(), customerSerde),Materialized.as(customerStateStore.name()));

Here is the problem, when I receive Order event and my customerKTable returns null and join operation becomes useless. This is not how it supposed to work. I think this is a bug in new version. My code is similar to Kafka Music example, I created TestConsumer class to test this. Code uploaded to Github for reference.

Guozhang Wang

unread,
Apr 26, 2018, 6:47:33 PM4/26/18
to Confluent Platform
Hello,

When you call `Materialized.as(customerStateStore.name())` Streams will create a new store internally with the given name, not the customerStateStore you created. So you can simply specify

streamsBuilder.table(.., Materialized.as("customer-store"));


BTW the music example link you specified is from an old code repo, which may contain deprecated APIs, here is the new code repo for it: https://github.com/confluentinc/kafka-streams-examples/tree/4.1.0-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic

Going back to your question, I looked through your code and it looks reasonable; when you say "customerKTable returns null" did you mean this line fails?

```
waitUntilStoreIsQueryable("customer-store", QueryableStoreTypes.keyValueStore(),streams);
```


Guozhang

jpava...@gmail.com

unread,
Apr 27, 2018, 1:08:48 AM4/27/18
to Confluent Platform
Hello Guozhang,

First, I tried 
streamsBuilder.table(.., Materialized.as("customer-store"));

statement initially, it did not work. So added  Materialized.as(customerStateStore.name()).  It did not make much difference. Second, when I say customerKTable returns null , customerKTable should fetch data from materialized view when order event received. So that Customer and Order joined based on customer id in both the tables.

jpava...@gmail.com

unread,
Apr 27, 2018, 11:39:24 AM4/27/18
to Confluent Platform
This issue was created by KTable. The KTable syntax I was using was syntactically correct but not working. Refer this issue for more information. Changing KTable syntax worked for me. Now, customerKTable returns events or objects from materialized view when Order event arrived.

Guozhang Wang

unread,
Apr 27, 2018, 4:49:08 PM4/27/18
to Confluent Platform
Hello,

I read about your other thread's information, what's surprising is that since your did not specify the serde, the default serdes will be used from config and since you do not specify the serializers (only the deserializers) the default byte array ones will be used, causing runtime Cast Exceptions. But it seems you did not observe any exceptions, i.e. the Streams still runs but just skipping records?
Reply all
Reply to author
Forward
0 new messages