I recently upgraded from kafka 0.10.1 to 1.1.0.
My stream app produces streams by subscribing to changes from our database by using confluent connect, does some calculation and then publishes their own stream/topic.
When starting the app, i attempt to get each of the stream store the app publishes. This code simply tries to get the store using KafkaStreams.store method in a try/catch loop (i try for 300 times to give the the stream time in case it is rebalancing or truly migrating). This all worked fine for kafka 0.10.2
After upgrading to kafka 1.1.0, the app starts the first time fine. However, if i try to restart the app, in cases where the stream consumes multiple topics from connect, such streams are always throwing InvalidStateStoreException. This does not happen for streams that subscribe to a single connect topic. To fix, i must delete the logs and store, then restarting my stream app, it works fine. But i always have to pretty much wiped the logs, and store each time i restart.
i debugged into the source a bit and found the issue is this call in org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
}
stores.add((T) store);
}
}
return stores;
}
For streams that consume multiple connect topics and produce a single stream/topic, when i restart the app, the above code is not finding the store for the topic it is supposed to publish (even though it has to exist given the app starts and works fine the first time i start it after clearing the logs and store (im manually delete those folders for now)). What is even more strange however, is that despite it not finding a store, it is still receiving connect produced topics and producing the calculated stream apparently just fine.
Anyone have any ideas on what might be happening here after the upgrade?
Don't have an account on this workspace yet?Contact the workspace administrator for an invitation
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ac55ef91-ee4b-4e10-8f7d-da4ec83e3d37%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/g0_PcVh6buA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/04198b28-7894-4eb5-bc69-c313a5ea6af0%40googlegroups.com.