Raft topology state application does two things: re-creates token metadata
and updates peers table if needed. The code for both task is intermixed
now. The patch separates it into separate functions. Will be needed in
the next patch.
---
service/storage_service.hh | 1 +
service/storage_service.cc | 238 +++++++++++++++++++++----------------
2 files changed, 137 insertions(+), 102 deletions(-)
diff --git a/service/storage_service.hh b/service/storage_service.hh
index a3c1f6ddd55..5b00efc955b 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -963,6 +963,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
std::vector<gms::inet_address> joined;
};
+ future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, bool notify_joined);
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
diff --git a/service/storage_service.cc b/service/storage_service.cc
index cf826ff3f96..b97ab91ebf7 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -410,20 +410,17 @@ static locator::node::state to_topology_node_state(node_state ns) {
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
}
-// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
-// gossiper) to align it with the other raft topology nodes.
-future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
- nodes_to_notify_after_sync nodes_to_notify;
+future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, bool notify_join) {
+ const auto& t = _topology_state_machine._topology;
+ raft::server_id raft_id{id.uuid()};
- rtlogger.trace("Start sync_raft_topology_nodes target_node={}", target_node);
+ std::vector<future<>> sys_ks_futures;
- const auto& am =_address_map;
- const auto& t = _topology_state_machine._topology;
+ auto node = t.find(raft_id);
- auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
- tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
- to_topology_node_state(rs.state), rs.shard_count);
- };
+ if (!node) {
+ co_return;
+ }
using host_id_to_ip_map_t = std::unordered_map<locator::host_id, gms::inet_address>;
auto get_host_id_to_ip_map = [&, map = std::optional<host_id_to_ip_map_t>{}]() mutable -> future<const host_id_to_ip_map_t*> {
@@ -442,50 +439,13 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
co_return &*map;
};
- std::vector<future<>> sys_ks_futures;
-
- auto remove_ip = [&](inet_address ip, locator::host_id host_id, bool notify) -> future<> {
- sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(ip));
+ const auto& rs = node->second;
- if (const auto ep = _gossiper.get_endpoint_state_ptr(ip); ep && ep->get_host_id() == host_id) {
- co_await _gossiper.force_remove_endpoint(ip, gms::null_permit_id);
- if (notify) {
- nodes_to_notify.left.push_back({ip, host_id});
+ switch (rs.state) {
+ case node_state::normal: {
+ if (is_me(ip)) {
+ co_return;
}
- }
- };
-
- auto process_left_node = [&] (raft::server_id id) -> future<> {
- locator::host_id host_id{id.uuid()};
-
- if (const auto ip = am.find(host_id)) {
- co_await remove_ip(*ip, host_id, true);
- }
-
- if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
- update_topology(host_id,
t.left_nodes_rs.at(id));
- }
-
- // However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
- co_await _messaging.local().ban_host(host_id);
- };
-
- auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
- locator::host_id host_id{id.uuid()};
- auto ip = am.find(host_id);
-
- rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
- id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
- // Save tokens, not needed for raft topology management, but needed by legacy
- // Also ip -> id mapping is needed for address map recreation on reboot
- if (is_me(host_id)) {
- sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
- co_await _gossiper.add_local_application_state(
- std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
- std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
- std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
- );
- } else if (ip && !is_me(*ip)) {
// In replace-with-same-ip scenario the replaced node IP will be the same
// as ours, we shouldn't put it into system.peers.
@@ -498,7 +458,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
- auto info = get_peer_info_for_update(*ip);
+ auto info = get_peer_info_for_update(ip);
if (info) {
// And then amend with the info from raft
info->tokens = rs.ring.value().tokens;
@@ -506,29 +466,97 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
info->rack = rs.rack;
info->release_version = rs.release_version;
info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
- sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, *info));
+ sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
}
- if (!prev_normal.contains(id)) {
- nodes_to_notify.joined.push_back(*ip);
+
+ if (notify_join) {
+ co_await notify_joined(ip);
}
- if (const auto it = host_id_to_ip_map.find(host_id); it != host_id_to_ip_map.end() && it->second != *ip) {
+ if (const auto it = host_id_to_ip_map.find(id); it != host_id_to_ip_map.end() && it->second != ip) {
utils::get_local_injector().inject("crash-before-prev-ip-removed", [] {
slogger.info("crash-before-prev-ip-removed hit, killing the node");
_exit(1);
});
- // IP change is not expected to emit REMOVED_NODE notifications
- co_await remove_ip(it->second, host_id, false);
+
+ auto old_ip = it->second;
+ sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
+
+ if (const auto ep = _gossiper.get_endpoint_state_ptr(old_ip); ep && ep->get_host_id() == id) {
+ co_await _gossiper.force_remove_endpoint(old_ip, gms::null_permit_id);
+ }
}
}
+ break;
+ case node_state::bootstrapping:
+ if (!is_me(ip)) {
+ utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
+ rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
+ _exit(1);
+ });
+
+ // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
+ sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
+ }
+ break;
+ default:
+ break;
+ }
+ co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
+}
+
+// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
+// gossiper) to align it with the other raft topology nodes.
+future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
+ nodes_to_notify_after_sync nodes_to_notify;
+
+ rtlogger.trace("Start sync_raft_topology_nodes");
+
+ const auto& t = _topology_state_machine._topology;
+
+ auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
+ tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
+ to_topology_node_state(rs.state), rs.shard_count);
+ };
+
+ std::vector<future<>> sys_ks_futures;
+
+ auto process_left_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip) -> future<> {
+ if (ip) {
+ sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(*ip));
+
+ if (const auto ep = _gossiper.get_endpoint_state_ptr(*ip); ep && ep->get_host_id() == host_id) {
+ co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id);
+ nodes_to_notify.left.push_back({*ip, host_id});
+ }
+ }
+
+ if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
+ update_topology(host_id,
t.left_nodes_rs.at(id));
+ }
+
+ // However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
+ co_await _messaging.local().ban_host(host_id);
+ };
+
+ auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
+ rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
+ id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
+ // Save tokens, not needed for raft topology management, but needed by legacy
+ // Also ip -> id mapping is needed for address map recreation on reboot
+ if (is_me(host_id)) {
+ sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
+ co_await _gossiper.add_local_application_state(
+ std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
+ std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
+ std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
+ );
+ }
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
};
- auto process_transition_node = [&](raft::server_id id, const replica_state& rs) -> future<> {
- locator::host_id host_id{id.uuid()};
- auto ip = am.find(host_id);
-
+ auto process_transition_node = [&](raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
seastar::value_of([&] () -> sstring {
@@ -538,29 +566,16 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
- if (ip) {
- if (!is_me(*ip)) {
- utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
- rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
- _exit(1);
- });
-
- // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
- sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {}));
- }
- update_topology(host_id, rs);
- if (_topology_state_machine._topology.normal_nodes.empty()) {
- // This is the first node in the cluster. Insert the tokens as normal to the token ring early
- // so we can perform writes to regular 'distributed' tables during the bootstrap procedure
- // (such as the CDC generation write).
- // It doesn't break anything to set the tokens to normal early in this single-node case.
- co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
- } else {
- tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
- co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
- }
- } else if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_new) {
- on_internal_error(rtlogger, format("Bootstrapping node {} does not have IP mapping but the topology is in the write_both_read_new state", id));
+ update_topology(host_id, rs);
+ if (_topology_state_machine._topology.normal_nodes.empty()) {
+ // This is the first node in the cluster. Insert the tokens as normal to the token ring early
+ // so we can perform writes to regular 'distributed' tables during the bootstrap procedure
+ // (such as the CDC generation write).
+ // It doesn't break anything to set the tokens to normal early in this single-node case.
+ co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
+ } else {
+ tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
+ co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
}
break;
@@ -573,7 +588,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::removing:
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
- co_await process_normal_node(id, rs);
+ co_await process_normal_node(id, host_id, ip, rs);
break;
}
update_topology(host_id, rs);
@@ -584,7 +599,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::replacing: {
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
- auto existing_ip = am.find(locator::host_id{replaced_id.uuid()});
+ auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
@@ -596,41 +611,60 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
break;
case node_state::rebuilding:
// Rebuilding node is normal
- co_await process_normal_node(id, rs);
+ co_await process_normal_node(id, host_id, ip, rs);
break;
default:
on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
}
};
+ sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
+
if (target_node) {
raft::server_id raft_id{target_node->uuid()};
+ auto ip = _address_map.get(*target_node);
if (t.left_nodes.contains(raft_id)) {
- co_await process_left_node(raft_id);
+ co_await process_left_node(raft_id, *target_node, ip);
} else if (auto it = t.normal_nodes.find(raft_id); it != t.normal_nodes.end()) {
- co_await process_normal_node(raft_id, it->second);
+ co_await process_normal_node(raft_id, *target_node, ip, it->second);
} else if ((it = t.transition_nodes.find(raft_id)) != t.transition_nodes.end()) {
- co_await process_transition_node(raft_id, it->second);
+ co_await process_transition_node(raft_id, *target_node, ip, it->second);
}
+ sys_ks_futures.push_back(raft_topology_update_ip(*target_node, ip, true));
} else {
- sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
for (const auto& id: t.left_nodes) {
- co_await process_left_node(id);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_left_node(id, host_id, ip);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
+ }
}
for (const auto& [id, rs]: t.normal_nodes) {
- co_await process_normal_node(id, rs);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_normal_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, !prev_normal.contains(id)));
+ }
}
for (const auto& [id, rs]: t.transition_nodes) {
- co_await process_transition_node(id, rs);
- }
- for (auto id : t.get_excluded_nodes()) {
- locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
- if (n) {
- n->set_excluded(true);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_transition_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
}
}
}
+ for (auto id : t.get_excluded_nodes()) {
+ locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
+ if (n) {
+ n->set_excluded(true);
+ }
+ }
+
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
rtlogger.trace("End sync_raft_topology_nodes");
--
2.47.1