Tearing down multiple brokers during testing

954 views
Skip to first unread message

Tianxiang Xiong

unread,
Apr 18, 2017, 7:44:23 PM4/18/17
to Confluent Platform
In some of our Kafka 0.10.2.0 tests, we spin up multiple brokers (`KafkaServerStartable`s). However, when shutting them down (in reverse order of starting them up), we run into the following error in the teardown:

kafka.common.StateChangeFailedException: encountered error while electing leader for partition [_schemas,0] due to: No other replicas in ISR 0 for [_schemas,0] besides shutting down brokers 0.
    at kafka
.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:362) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka
.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:202) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka
.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:141) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka
.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:140) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at scala
.collection.immutable.Set$Set1.foreach(Set.scala:94) [scala-library-2.11.8.jar:na]
    at kafka
.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:140) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka
.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:268) [kafka_2.11-0.10.2.0-cp1.jar:na]

So our tests pass, but an error is thrown in the teardown code, so CI fails.

My understanding is that because `controlled.shutdown.enable` defaults to `true`, every time we shut down a broker Kafka tries to move partition leaders to remaining brokers. When it's on the last broker, there's nowhere else to move, so it throws an error.

We're able to get around this by setting `controlled.shutdown.enable` to `false`, which should be fine for testing. However, we then end up getting a lot of warnings such as:

16:20:18.844 [Controller-0-to-broker-2-send-thread] WARN  kafka.controller.RequestSendThread - [Controller-0-to-broker-2-send-thread], Controller 0's connection to broker localhost:9094 (id: 2 rack: null) was unsuccessful
java.io.IOException: Connection to localhost:9094 (id: 2 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) ~[kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) ~[kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184) [kafka_2.11-0.10.2.0-cp1.jar:na]
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [kafka_2.11-0.10.2.0-cp1.jar:na]

Is setting `controlled.shutdown.enable` to `false` the right thing to do for testing? If so, is there a way to stop spamming these warnings as above?


Tianxiang Xiong

unread,
Apr 24, 2017, 12:56:25 PM4/24/17
to Confluent Platform
From Ryan Pridgeon at Confluent support:

I suspect there may be some additional details here which are not obvious from what has been posted in the mailing group. As a quick sanity check I set up a 2 node cluster with a few singly-replicated topics spread across the nodes. I then performed broker shutdowns in various order to try to reproduce. This did not yield similar results which lead me to believe there are some additional details here.

When you get a chance could you provide the following information about your experience:
  1. Are other topics affected or just the __schemas topic
  2. How many brokers are there?
  3. What is the replication factor for the offending topic(s)?
  4. The running broker config, if it's the default you can just say default

Tianxiang Xiong

unread,
Apr 24, 2017, 1:12:31 PM4/24/17
to Confluent Platform
  1. The only topic that seems to be affected is the _schemas topic 
  2. Number of brokers: 3 
  3. Replication factor: 1 (default) 
  4. Broker configs:
// Broker 1
{"zookeeper.connect" "localhost:2181",
"broker.id" "0",
"advertised.host.name" "localhost",
"auto.create.topics.enable" "true",
"offsets.topic.num.partitions" "1",
"log.dirs" "/.../embedded-kafka/kafka-log-0",
"listeners" "PLAINTEXT://localhost:9092",
"advertised.listeners" "PLAINTEXT://localhost:9092"}

// Broker 2
{"zookeeper.connect" "localhost:2181",
"broker.id" "1",
"advertised.host.name" "localhost",
"auto.create.topics.enable" "true",
"offsets.topic.num.partitions" "1",
"log.dirs" "/.../embedded-kafka/kafka-log-1",
"listeners" "PLAINTEXT://localhost:9093",
"advertised.listeners" "PLAINTEXT://localhost:9093"}

// Broker 3
{"zookeeper.connect" "localhost:2181",
"broker.id" "2",
"advertised.host.name" "localhost",
"auto.create.topics.enable" "true",
"offsets.topic.num.partitions" "1",
"log.dirs" "/.../embedded-kafka/kafka-log-2" ,
"listeners" "PLAINTEXT://localhost:9094",
"advertised.listeners" "PLAINTEXT://localhost:9094"}

Schema registry config:

{"listeners" "http://127.0.0.1:8081",
"kafkastore.connection.url" "localhost:2181",
"kafkastore.topic" "_schemas"}

Basic flow:
  • Bring up ZooKeeper (ZookeeperServer
  • Bring up 3 brokers (KafkaServerStartable
  • Bring up Schema Registry (SchemaRegistryRestApplication
  • Test Schema Registry functionality by creating schemas, listing subjects, etc. via HTTP API
  • Shut down things previously brought up

Tianxiang Xiong

unread,
Apr 24, 2017, 1:13:33 PM4/24/17
to Confluent Platform
From Ryan:

If you configure the Schema Registry with kafkastore.topic.replication.factor=1 and enabled controlled.shutdown.enable=true you should be able to run your tests without throwing either error.

The problem here is that the Schema Registry creates its topic with replication.factor=3. This increased replication factor requires that the controller reassign partition leadership upon shutdown. Once you are down to the last node there is nowhere else to transfer leadership so an exception is thrown.

If the replication factor is 1 this reassignment is unnecessary so the controller can continue without raising the exception.


Disabling controlled.shutdown.enable=true allows the broker to terminate its socket server without contacting the controller. This leaves a relatively brief window in which the controller may, and does, send requests to brokers which are no longer accepting connections. This results in the second set of exceptions you mentioned.


Since this is strictly for testing I'd recommend lowering the Schema Registry's replication factor to 1 with kafkastore.topic.replication.factor=1. This in conjunction with the broker's controlled.shutdown.enable=true should allow you to bypass these messages in the logs.

Tianxiang Xiong

unread,
Apr 24, 2017, 1:14:41 PM4/24/17
to Confluent Platform
If I'm running 3 brokers, why is it necessary to bring the replication factor of the _schemas topic down to 1?

Why is an error thrown when there is 1 partition vs. 3 partitions? I'd think that in either case, one partition is the leader, and the two scenarios should behave identically r.e. broker shutdown. Is it b/c the exception is thrown as part of the leadership election process, and in case of only 1 partition it is never necessary to elect another leader?

Regardless, after setting `kafkastore.topic.replication.factor=1`, the `kafka.common.StateChangeFailedException` disappears, the `kafka.common.StateChangeFailedException` has disappeared, though we're still getting:

16:44:08.282 [ZkClient-EventThread-558-localhost:2181] ERROR state.change.logger - Controller 2 epoch 2 initiated state change for partition [_schemas,0] from OfflinePartition to OnlinePartition failed
kafka
.common.NoReplicaOnlineException: No replica for partition [_schemas,0] is alive. Live brokers are: [Set(2)], Assigned replicas are: [List(0)]

which I'm guessing is expected?

Big picture: is shutting down all brokers something that is unusual when using Kafka? It seems strange that an error is produced when all brokers are shut down "properly" via `shutdown`/`awaitShutdown`.

Tianxiang Xiong

unread,
Apr 24, 2017, 1:16:13 PM4/24/17
to Confluent Platform
From Ryan:

If I'm running 3 brokers, why is it necessary to bring the replication factor of the _schemas topic down to 1?

The only reason this is necessary because you wanted to avoid seeing the error thrown when shutting down all of the brokers.

Why is an error thrown when there is 1 partition vs. 3 partitions?

The error is not thrown with a replication factor of 1 because there are no replicas with which to transfer leadership to in the first place. That means there is nothing for the controller to do with that replica.

This is also covered in the graceful shutdown documentation which is worth reviewing

Regardless, after setting kafkastore.topic.replication.factor=1, the kafka.common.StateChangeFailedException disappears, the kafka.common.StateChangeFailedException has disappeared, though we're still getting:
 
16:44:08.282 [ZkClient-EventThread-558-localhost:2181] ERROR state.change.logger - Controller 2 epoch 2 initiated state change for partition [_schemas,0] from OfflinePartition to OnlinePartition failed
kafka
.common.NoReplicaOnlineException: No replica for partition [_schemas,0] is alive. Live brokers are: [Set(2)], Assigned replicas are: [List(0)]


Yes once all of the brokers which hold replicas for a partition are down you will not be able to bring the replica back online. This is expected and normal behavior

Shutting down all brokers at the same time is relatively uncommon although not unheard of. The presence of these errors are nothing to be concerned with however. The controller has no way to tell if your intent is to shutdown the entire system or a single node. As such it must enforce all of the contracts it would normally be bound to during normal operations. This will result in errors such as the ones you have described here, but this is actually an indication that everything is working as intended.

Tianxiang Xiong

unread,
Apr 24, 2017, 1:17:14 PM4/24/17
to Confluent Platform
It seems that we can make the error go away by another means: setting `controlled.shutdown.enable` to `false`, while leaving `_schemas` replication factor the default (3).

What are the implications of not using controlled shutdown? From what I can see, if we're not restarting the brokers in the same test, then there's no difference. There may be an effect in crash tests where we purposefully cause brokers to crash/restart, though.

The way I see it, we a choice b/t two "unrealistic" scenarios in testing

1. Make _schemas replication factor 1 (instead of 3) 
2. Avoid using controlled shutdown

I suppose the question is, which choice is closer to "realistic"? I think it's (1); just want to make sure. Bottom line is that we want the tests to match production conditions AMAP.

Tianxiang Xiong

unread,
Apr 24, 2017, 5:13:47 PM4/24/17
to Confluent Platform
From Ryan:

If you wish to more closely mimic production then having replication.factor=3 is your best bet.

Graceful shutdowns have two primary directives which aid in smoother leadership transitions and restart times.
  1. Sync all its logs to disk to avoid needing to do any log recovery when it restarts
  2. Transition leadership before shutting down, this speeds up the transition and thus partition 'offline time'.
Since you are shutting down all the servers anyway I'm not sure you require such a luxury.
Reply all
Reply to author
Forward
0 new messages