Kafka Streams - error with multiple instances on trying to build in-memory state store

468 views
Skip to first unread message

Hari Thiruppathi

unread,
Sep 4, 2017, 6:26:54 AM9/4/17
to Confluent Platform
Hi,
We are using Kafka Streams to process an events stream and build an in-memory state store to serve processed data. We run the streams against 2 topics, with 4 partitions each. I am running it on my laptop with 2 kafka brokers and replication factor for both the source topics and internal topics set to 1. We try to create 3 in memory state stores with standby replicas set to 1. We also built an API on top of this to serve data from the in memory state store. When we bring up the first instance of our streams app, everything seems to work fine. Once the second instance is started, the streams break in both instances with the following error.

[ERROR] stream-thread [StreamThread-1] Streams application error during processing:
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:989)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:723)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:648)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[error] (StreamThread-1) java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

This doesn't happen when we specify the state store as a persistent.

Regards
Hari

Damian Guy

unread,
Sep 4, 2017, 9:21:57 AM9/4/17
to Confluent Platform
Hi,

Which version of streams are you using?
Can you provide some code?
Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/682cdc63-70f9-4ee1-8e5a-6729cfc96012%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hari Thiruppathi

unread,
Sep 4, 2017, 10:22:24 AM9/4/17
to Confluent Platform
We are currently testing with 0.11.0.0. But experienced the issue with 0.10.2.0 as well.

This is the config .
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "testApp")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1")
p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1")
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
p.put(StreamsConfig.APPLICATION_SERVER_CONFIG, s"$apiHost:$apiPort")

We create the state store as follows.

Stores.create("TestStateStore")
.withKeys(stringSerde)
.withValues(jsonFormatSerde[T])
.inMemory()
.build()
.asInstanceOf[StateStoreSupplier[KeyValueStore[_,_]]]

Regards
Hari


On Monday, September 4, 2017 at 2:21:57 PM UTC+1, Damian Guy wrote:
Hi,

Which version of streams are you using?
Can you provide some code?
Thanks,
Damian

On Mon, 4 Sep 2017 at 11:26 Hari Thiruppathi <harithi...@gmail.com> wrote:
Hi,
We are using Kafka Streams to process an events stream and build an in-memory state store to serve processed data. We run the streams against 2 topics, with 4 partitions each. I am running it on my laptop with 2 kafka brokers and replication factor for both the source topics and internal topics set to 1. We try to create 3 in memory state stores with standby replicas set to 1. We also built an API on top of this to serve data from the in memory state store. When we bring up the first instance of our streams app, everything seems to work fine. Once the second instance is started, the streams break in both instances with the following error.

[ERROR] stream-thread [StreamThread-1] Streams application error during processing:
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:989)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:723)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:648)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[error] (StreamThread-1) java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

This doesn't happen when we specify the state store as a persistent.

Regards
Hari

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Message has been deleted
Message has been deleted

Hari Thiruppathi

unread,
Sep 11, 2017, 4:30:00 AM9/11/17
to Confluent Platform
Hi,
Are there any pointers on this, or is any additional info required?

Hari
Reply all
Reply to author
Forward
0 new messages