[QUEUED scylla next] gossiper: Send generation number with shutdown message

1 view
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Sep 23, 2021, 6:50:41 PM9/23/21
to scylladb-dev@googlegroups.com, Asias He
From: Asias He <as...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: next

gossiper: Send generation number with shutdown message

Consider:
- n1, n2 in the cluster
- n2 shutdown
- n2 sends gossip shutdown message to n1
- n1 delays processing of the handler of shutdown message
- n2 restarts
- n1 learns new gossip state of n2
- n1 resumes to handle the shutdown message
- n1 will mark n2 as shutdown status incorrectly until n2 restarts again

To prevent this, we can send the gossip generation number along with the
shutdown message. If the generation number does not match the local
generation number for the remote node, the shutdown message will be
ignored.

Since we use the rpc::optional to send the generation number, it works
with mixed cluster.

Fixes #8597

Closes #9381

---
diff --git a/gms/gossiper.cc b/gms/gossiper.cc
--- a/gms/gossiper.cc
+++ b/gms/gossiper.cc
@@ -429,13 +429,30 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t
return make_ready_future<>();
}

-future<> gossiper::handle_shutdown_msg(inet_address from) {
+future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt) {
if (!is_enabled()) {
logger.debug("Ignoring shutdown message from {} because gossip is disabled", from);
return make_ready_future<>();
}
- return seastar::async([this, from] {
+ return seastar::async([this, from, generation_number_opt] {
auto permit = this->lock_endpoint(from).get0();
+ if (generation_number_opt) {
+ auto es = this->get_endpoint_state_for_endpoint_ptr(from);
+ if (es) {
+ int local_generation = es->get_heart_beat_state().get_generation();
+ logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
+ from, generation_number_opt.value(), local_generation);
+ if (local_generation != generation_number_opt.value()) {
+ logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}",
+ from, generation_number_opt.value(), local_generation);
+ return;
+ }
+ } else {
+ logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found",
+ from, generation_number_opt.value());
+ return;
+ }
+ }
this->mark_as_shutdown(from);
});
}
@@ -495,10 +512,10 @@ void gossiper::init_messaging_service_handler() {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return handle_echo_msg(from, generation_number_opt);
});
- _messaging.register_gossip_shutdown([this] (inet_address from) {
+ _messaging.register_gossip_shutdown([this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
// In a new fiber.
- (void)container().invoke_on(0, [from] (gms::gossiper& gossiper) {
- return gossiper.handle_shutdown_msg(from);
+ (void)container().invoke_on(0, [from, generation_number_opt] (gms::gossiper& gossiper) {
+ return gossiper.handle_shutdown_msg(from, generation_number_opt);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
});
@@ -2119,13 +2136,14 @@ future<> gossiper::do_stop_gossiping() {
logger.info("My status = {}", get_gossip_status(*my_ep_state));
}
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
+ int local_generation = my_ep_state->get_heart_beat_state().get_generation();
logger.info("Announcing shutdown");
add_local_application_state(application_state::STATUS, versioned_value::shutdown(true)).get();
auto live_endpoints = _live_endpoints;
for (inet_address addr : live_endpoints) {
msg_addr id = get_msg_addr(addr);
- logger.trace("Sending a GossipShutdown to {}", id);
- _messaging.send_gossip_shutdown(id, get_broadcast_address()).then_wrapped([id] (auto&&f) {
+ logger.info("Sending a GossipShutdown to {} with generation {}", id.addr, local_generation);
+ _messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation).then_wrapped([id] (auto&&f) {
try {
f.get();
logger.trace("Got GossipShutdown Reply");
diff --git a/gms/gossiper.hh b/gms/gossiper.hh
--- a/gms/gossiper.hh
+++ b/gms/gossiper.hh
@@ -127,7 +127,7 @@ private:
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
future<> handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg);
future<> handle_echo_msg(inet_address from, std::optional<int64_t> generation_number_opt);
- future<> handle_shutdown_msg(inet_address from);
+ future<> handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt);
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest);
future<gossip_get_endpoint_states_response> handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request);
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -1099,14 +1099,14 @@ future<> messaging_service::send_gossip_echo(msg_addr id, int64_t generation_num
return send_message_timeout<void>(this, messaging_verb::GOSSIP_ECHO, std::move(id), timeout, generation_number);
}

-void messaging_service::register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from)>&& func) {
+void messaging_service::register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from, rpc::optional<int64_t> generation_number)>&& func) {
register_handler(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(func));
}
future<> messaging_service::unregister_gossip_shutdown() {
return unregister_handler(netw::messaging_verb::GOSSIP_SHUTDOWN);
}
-future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from) {
- return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from));
+future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from, int64_t generation_number) {
+ return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from), generation_number);
}

// gossip syn
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -413,9 +413,9 @@ public:
future<> send_gossip_echo(msg_addr id, int64_t generation_number, std::chrono::milliseconds timeout);

// Wrapper for GOSSIP_SHUTDOWN
- void register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from)>&& func);
+ void register_gossip_shutdown(std::function<rpc::no_wait_type (inet_address from, rpc::optional<int64_t> generation_number)>&& func);
future<> unregister_gossip_shutdown();
- future<> send_gossip_shutdown(msg_addr id, inet_address from);
+ future<> send_gossip_shutdown(msg_addr id, inet_address from, int64_t generation_number);

// Wrapper for GOSSIP_DIGEST_SYN
void register_gossip_digest_syn(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_syn)>&& func);

Commit Bot

<bot@cloudius-systems.com>
unread,
Sep 27, 2021, 4:08:58 AM9/27/21
to scylladb-dev@googlegroups.com, Asias He
From: Asias He <as...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
diff --git a/test/manual/message.cc b/test/manual/message.cc
--- a/test/manual/message.cc
+++ b/test/manual/message.cc
@@ -108,7 +108,7 @@ class tester {
return messaging_service::no_wait();
});

- ms.register_gossip_shutdown([] (inet_address from) {
+ ms.register_gossip_shutdown([] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
fmt::print("Server got shutdown msg = {}\n", from);
return messaging_service::no_wait();
});
@@ -142,7 +142,8 @@ class tester {
fmt::print("=== {} ===\n", __func__);
auto id = get_msg_addr();
inet_address from("127.0.0.1");
- return ms.send_gossip_shutdown(id, from).then([] () {
+ int64_t gen = 0x1;
+ return ms.send_gossip_shutdown(id, from, gen).then([] () {
fmt::print("Client sent gossip_shutdown got reply = void\n");
return make_ready_future<>();
});

Commit Bot

<bot@cloudius-systems.com>
unread,
Sep 27, 2021, 6:53:05 AM9/27/21
to scylladb-dev@googlegroups.com, Asias He
From: Asias He <as...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages