[PATCH v1 0/2] Do not update topology on address change

13 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 20, 2025, 3:33:08 AMJan 20
to scylladb-dev@googlegroups.com
Since now topology does not contain ip addresses there is no need to
create topology on an ip address change. Only peers table has to be
updated. The series factors out peers table update code from
sync_raft_topology_nodes() and calls it on topology and ip address
updates. As a side effect it fixes #22293 since now topology loading
does not require IP do be present, so the assert that is triggered in
this bug is removed.

Fixes: scylladb/scylladb#22293

CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/14568/

Also in scylla-dev gleb/not-update-tm-on-addr-change

Gleb Natapov (2):
topology coordinator: split out the peer table update functionality
from raft state application
topology coordinator: do not update topology on address change

service/storage_service.hh | 3 +-
service/storage_service.cc | 256 ++++++++++++++++++++-----------------
2 files changed, 138 insertions(+), 121 deletions(-)

--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 20, 2025, 3:33:08 AMJan 20
to scylladb-dev@googlegroups.com
Since now topology does not contain ip addresses there is no need to
create topology on an ip address change. Only peers table has to be
updated, so call a function that does peers table update only.
---
service/storage_service.hh | 2 +-
service/storage_service.cc | 68 ++++++++++++++------------------------
2 files changed, 26 insertions(+), 44 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index 5b00efc955b..8ecf09f4a62 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -968,7 +968,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
// Returns a structure that describes which notifications to trigger after token metadata is updated.
- future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal);
+ future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);
diff --git a/service/storage_service.cc b/service/storage_service.cc
index b97ab91ebf7..359aa362902 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -507,7 +507,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet

// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
-future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
+future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal) {
nodes_to_notify_after_sync nodes_to_notify;

rtlogger.trace("Start sync_raft_topology_nodes");
@@ -620,44 +620,30 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t

sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());

- if (target_node) {
- raft::server_id raft_id{target_node->uuid()};
- auto ip = _address_map.get(*target_node);
- if (t.left_nodes.contains(raft_id)) {
- co_await process_left_node(raft_id, *target_node, ip);
- } else if (auto it = t.normal_nodes.find(raft_id); it != t.normal_nodes.end()) {
- co_await process_normal_node(raft_id, *target_node, ip, it->second);
- } else if ((it = t.transition_nodes.find(raft_id)) != t.transition_nodes.end()) {
- co_await process_transition_node(raft_id, *target_node, ip, it->second);
- }
- sys_ks_futures.push_back(raft_topology_update_ip(*target_node, ip, true));
- } else {
- for (const auto& id: t.left_nodes) {
- locator::host_id host_id{id.uuid()};
- auto ip = _address_map.find(host_id);
- co_await process_left_node(id, host_id, ip);
- if (ip) {
- sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
- }
+ for (const auto& id: t.left_nodes) {
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_left_node(id, host_id, ip);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
}
- for (const auto& [id, rs]: t.normal_nodes) {
- locator::host_id host_id{id.uuid()};
- auto ip = _address_map.find(host_id);
- co_await process_normal_node(id, host_id, ip, rs);
- if (ip) {
- sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, !prev_normal.contains(id)));
- }
+ }
+ for (const auto& [id, rs]: t.normal_nodes) {
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_normal_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, !prev_normal.contains(id)));
}
- for (const auto& [id, rs]: t.transition_nodes) {
- locator::host_id host_id{id.uuid()};
- auto ip = _address_map.find(host_id);
- co_await process_transition_node(id, host_id, ip, rs);
- if (ip) {
- sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
- }
+ }
+ for (const auto& [id, rs]: t.transition_nodes) {
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_transition_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
}
}
-
for (auto id : t.get_excluded_nodes()) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
@@ -785,7 +771,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
}, _topology_state_machine._topology.tstate);
tmptr->set_read_new(read_new);

- auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::nullopt, std::move(prev_normal));
+ auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal));

std::optional<locator::tablet_metadata> tablets;
if (hint.tablets_hint) {
@@ -974,15 +960,11 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
- (void)futurize_invoke(ensure_alive([this, id, h = _ss._async_gate.hold()]() -> future<> {
+ (void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> {
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
- storage_service::nodes_to_notify_after_sync nodes_to_notify;
- auto lock = co_await _ss.get_token_metadata_lock();
- co_await _ss.mutate_token_metadata([this, id, &nodes_to_notify](mutable_token_metadata_ptr t) -> future<> {
- nodes_to_notify = co_await _ss.sync_raft_topology_nodes(std::move(t), id, {});
- }, storage_service::acquire_merge_lock::no);
- co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
+ // Set notify_join to true since here we detected address change and drivers have to be notified
+ co_await _ss.raft_topology_update_ip(id, endpoint, true);
}));
}
}
--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 20, 2025, 3:33:10 AMJan 20
to scylladb-dev@googlegroups.com
Raft topology state application does two things: re-creates token metadata
and updates peers table if needed. The code for both task is intermixed
now. The patch separates it into separate functions. Will be needed in
the next patch.
---
service/storage_service.hh | 1 +
service/storage_service.cc | 238 +++++++++++++++++++++----------------
2 files changed, 137 insertions(+), 102 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index a3c1f6ddd55..5b00efc955b 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -963,6 +963,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
std::vector<gms::inet_address> joined;
};

+ future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, bool notify_joined);
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
diff --git a/service/storage_service.cc b/service/storage_service.cc
index cf826ff3f96..b97ab91ebf7 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -410,20 +410,17 @@ static locator::node::state to_topology_node_state(node_state ns) {
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
}

-// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
-// gossiper) to align it with the other raft topology nodes.
-future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
- nodes_to_notify_after_sync nodes_to_notify;
+future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, bool notify_join) {
+ const auto& t = _topology_state_machine._topology;
+ raft::server_id raft_id{id.uuid()};

- rtlogger.trace("Start sync_raft_topology_nodes target_node={}", target_node);
+ std::vector<future<>> sys_ks_futures;

- const auto& am =_address_map;
- const auto& t = _topology_state_machine._topology;
+ auto node = t.find(raft_id);

- auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
- tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
- to_topology_node_state(rs.state), rs.shard_count);
- };
+ if (!node) {
+ co_return;
+ }

using host_id_to_ip_map_t = std::unordered_map<locator::host_id, gms::inet_address>;
auto get_host_id_to_ip_map = [&, map = std::optional<host_id_to_ip_map_t>{}]() mutable -> future<const host_id_to_ip_map_t*> {
@@ -442,50 +439,13 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
co_return &*map;
};

- std::vector<future<>> sys_ks_futures;
-
- auto remove_ip = [&](inet_address ip, locator::host_id host_id, bool notify) -> future<> {
- sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(ip));
+ const auto& rs = node->second;

- if (const auto ep = _gossiper.get_endpoint_state_ptr(ip); ep && ep->get_host_id() == host_id) {
- co_await _gossiper.force_remove_endpoint(ip, gms::null_permit_id);
- if (notify) {
- nodes_to_notify.left.push_back({ip, host_id});
+ switch (rs.state) {
+ case node_state::normal: {
+ if (is_me(ip)) {
+ co_return;
}
- }
- };
-
- auto process_left_node = [&] (raft::server_id id) -> future<> {
- locator::host_id host_id{id.uuid()};
-
- if (const auto ip = am.find(host_id)) {
- co_await remove_ip(*ip, host_id, true);
- }
-
- if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
- update_topology(host_id, t.left_nodes_rs.at(id));
- }
-
- // However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
- co_await _messaging.local().ban_host(host_id);
- };
-
- auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
- locator::host_id host_id{id.uuid()};
- auto ip = am.find(host_id);
-
- rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
- id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
- // Save tokens, not needed for raft topology management, but needed by legacy
- // Also ip -> id mapping is needed for address map recreation on reboot
- if (is_me(host_id)) {
- sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
- co_await _gossiper.add_local_application_state(
- std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
- std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
- std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
- );
- } else if (ip && !is_me(*ip)) {
// In replace-with-same-ip scenario the replaced node IP will be the same
// as ours, we shouldn't put it into system.peers.

@@ -498,7 +458,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
- auto info = get_peer_info_for_update(*ip);
+ auto info = get_peer_info_for_update(ip);
if (info) {
// And then amend with the info from raft
info->tokens = rs.ring.value().tokens;
@@ -506,29 +466,97 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
info->rack = rs.rack;
info->release_version = rs.release_version;
info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
- sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, *info));
+ sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
}
- if (!prev_normal.contains(id)) {
- nodes_to_notify.joined.push_back(*ip);
+
+ if (notify_join) {
+ co_await notify_joined(ip);
}

- if (const auto it = host_id_to_ip_map.find(host_id); it != host_id_to_ip_map.end() && it->second != *ip) {
+ if (const auto it = host_id_to_ip_map.find(id); it != host_id_to_ip_map.end() && it->second != ip) {
utils::get_local_injector().inject("crash-before-prev-ip-removed", [] {
slogger.info("crash-before-prev-ip-removed hit, killing the node");
_exit(1);
});
- // IP change is not expected to emit REMOVED_NODE notifications
- co_await remove_ip(it->second, host_id, false);
+
+ auto old_ip = it->second;
+ sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
+
+ if (const auto ep = _gossiper.get_endpoint_state_ptr(old_ip); ep && ep->get_host_id() == id) {
+ co_await _gossiper.force_remove_endpoint(old_ip, gms::null_permit_id);
+ }
}
}
+ break;
+ case node_state::bootstrapping:
+ if (!is_me(ip)) {
+ utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
+ rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
+ _exit(1);
+ });
+
+ // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
+ sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
+ }
+ break;
+ default:
+ break;
+ }
+ co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
+}
+
+// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
+// gossiper) to align it with the other raft topology nodes.
+future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
+ nodes_to_notify_after_sync nodes_to_notify;
+
+ rtlogger.trace("Start sync_raft_topology_nodes");
+
+ const auto& t = _topology_state_machine._topology;
+
+ auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
+ tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
+ to_topology_node_state(rs.state), rs.shard_count);
+ };
+
+ std::vector<future<>> sys_ks_futures;
+
+ auto process_left_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip) -> future<> {
+ if (ip) {
+ sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(*ip));
+
+ if (const auto ep = _gossiper.get_endpoint_state_ptr(*ip); ep && ep->get_host_id() == host_id) {
+ co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id);
+ nodes_to_notify.left.push_back({*ip, host_id});
+ }
+ }
+
+ if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
+ update_topology(host_id, t.left_nodes_rs.at(id));
+ }
+
+ // However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
+ co_await _messaging.local().ban_host(host_id);
+ };
+
+ auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
+ rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
+ id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
+ // Save tokens, not needed for raft topology management, but needed by legacy
+ // Also ip -> id mapping is needed for address map recreation on reboot
+ if (is_me(host_id)) {
+ sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
+ co_await _gossiper.add_local_application_state(
+ std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
+ std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
+ std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
+ );
+ }
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
};

- auto process_transition_node = [&](raft::server_id id, const replica_state& rs) -> future<> {
- locator::host_id host_id{id.uuid()};
- auto ip = am.find(host_id);
-
+ auto process_transition_node = [&](raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
seastar::value_of([&] () -> sstring {
@@ -538,29 +566,16 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
- if (ip) {
- if (!is_me(*ip)) {
- utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
- rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
- _exit(1);
- });
-
- // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
- sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {}));
- }
- update_topology(host_id, rs);
- if (_topology_state_machine._topology.normal_nodes.empty()) {
- // This is the first node in the cluster. Insert the tokens as normal to the token ring early
- // so we can perform writes to regular 'distributed' tables during the bootstrap procedure
- // (such as the CDC generation write).
- // It doesn't break anything to set the tokens to normal early in this single-node case.
- co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
- } else {
- tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
- co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
- }
- } else if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_new) {
- on_internal_error(rtlogger, format("Bootstrapping node {} does not have IP mapping but the topology is in the write_both_read_new state", id));
+ update_topology(host_id, rs);
+ if (_topology_state_machine._topology.normal_nodes.empty()) {
+ // This is the first node in the cluster. Insert the tokens as normal to the token ring early
+ // so we can perform writes to regular 'distributed' tables during the bootstrap procedure
+ // (such as the CDC generation write).
+ // It doesn't break anything to set the tokens to normal early in this single-node case.
+ co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
+ } else {
+ tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
+ co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
}
break;
@@ -573,7 +588,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::removing:
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
- co_await process_normal_node(id, rs);
+ co_await process_normal_node(id, host_id, ip, rs);
break;
}
update_topology(host_id, rs);
@@ -584,7 +599,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::replacing: {
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
- auto existing_ip = am.find(locator::host_id{replaced_id.uuid()});
+ auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
@@ -596,41 +611,60 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
break;
case node_state::rebuilding:
// Rebuilding node is normal
- co_await process_normal_node(id, rs);
+ co_await process_normal_node(id, host_id, ip, rs);
break;
default:
on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
}
};

+ sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
+
if (target_node) {
raft::server_id raft_id{target_node->uuid()};
+ auto ip = _address_map.get(*target_node);
if (t.left_nodes.contains(raft_id)) {
- co_await process_left_node(raft_id);
+ co_await process_left_node(raft_id, *target_node, ip);
} else if (auto it = t.normal_nodes.find(raft_id); it != t.normal_nodes.end()) {
- co_await process_normal_node(raft_id, it->second);
+ co_await process_normal_node(raft_id, *target_node, ip, it->second);
} else if ((it = t.transition_nodes.find(raft_id)) != t.transition_nodes.end()) {
- co_await process_transition_node(raft_id, it->second);
+ co_await process_transition_node(raft_id, *target_node, ip, it->second);
}
+ sys_ks_futures.push_back(raft_topology_update_ip(*target_node, ip, true));
} else {
- sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
for (const auto& id: t.left_nodes) {
- co_await process_left_node(id);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_left_node(id, host_id, ip);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
+ }
}
for (const auto& [id, rs]: t.normal_nodes) {
- co_await process_normal_node(id, rs);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_normal_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, !prev_normal.contains(id)));
+ }
}
for (const auto& [id, rs]: t.transition_nodes) {
- co_await process_transition_node(id, rs);
- }
- for (auto id : t.get_excluded_nodes()) {
- locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
- if (n) {
- n->set_excluded(true);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_transition_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, false));
}
}
}

+ for (auto id : t.get_excluded_nodes()) {
+ locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
+ if (n) {
+ n->set_excluded(true);
+ }
+ }
+
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();

rtlogger.trace("End sync_raft_topology_nodes");
--
2.47.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 21, 2025, 8:30:26 AMJan 21
to Gleb Natapov, scylladb-dev@googlegroups.com
This patch is extremely hard to review... multiple things combined into
one, and it produces a weird diff. Could be easily split into multiple
steps making it much easier to understand. Also it seems it's also
performing some logic changes and changes timing of things, it doesn't
just split things into functions as the commit message claims.

I found the diff is pretty useless, I had to jump between the two
commits and remember what's happening in each to actually be able to
review it.

Anyway what I found:
- inlining remove_ip lambda: could be done in separate patch.

- notifications (notify_joined etc.): previously they would be collected
in a struct and fired at the end after everything else is done. Now they
are being fired directly. Why? Was there a reason that notifications
were being postponed before, and that reason no longer holds so we can
fire them directly?

- in previous version of sync_raft_topology_nodes, in
process_transition_node, some calls to update_topology and
update_normal_tokens were done inside `if (ip)` branch (nested in `if
(rs.ring.has_value())` so they only happened only if IP mapping was
present. Now they are done directly in `if (rs.ring.has_value()` whether
or not we have IP. Why? It looks like previously having IP was
considered a different state from not having it, now they were merged
into the same state.

- the previous `else if` branch in `if (rs.ring.has_value())` asserting
that we're not in `write_both_read_new` if IP is not present is gone.
IIUC this was the assertion we were hitting in #22293. So this patch
already fixes that issue?

Anyway each of these questions should probably correspond to a separate
commit, with commit message explaining everything, but since I already
spent 2 hours trying to understand this patch, and I think I more or
less understand now, I guess it would be a waste to spend time splitting
it now, so I only need the answers to these questions.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 21, 2025, 9:00:46 AMJan 21
to Kamil Braun, scylladb-dev@googlegroups.com
On Tue, Jan 21, 2025 at 02:30:21PM +0100, Kamil Braun wrote:
> This patch is extremely hard to review... multiple things combined into one,
> and it produces a weird diff. Could be easily split into multiple steps
> making it much easier to understand. Also it seems it's also performing some
> logic changes and changes timing of things, it doesn't just split things
> into functions as the commit message claims.
>
The patch splits two completely different tasks that was done in a
single code path. I do not see an simple way to untangle the logic.

> I found the diff is pretty useless, I had to jump between the two commits
> and remember what's happening in each to actually be able to review it.
>
> Anyway what I found:
> - inlining remove_ip lambda: could be done in separate patch.
>
Could be, but will it make the patch much simpler?

> - notifications (notify_joined etc.): previously they would be collected in
> a struct and fired at the end after everything else is done. Now they are
> being fired directly. Why? Was there a reason that notifications were being
> postponed before, and that reason no longer holds so we can fire them
> directly?
According to the git history it was important to run leaving notifications
after token_metadata is updated, so it was done for both. This patch
does not change leaving notification handling. It is probably possible to
preserve the property for joining nodes as well by collecting them in a
list in raft_topology_update_ip, but on ip address change path it is not
needed any longer since we do not re-create token metadata at all there,
so it is just unneeded complication unless the property it really
important.

>
> - in previous version of sync_raft_topology_nodes, in
> process_transition_node, some calls to update_topology and
> update_normal_tokens were done inside `if (ip)` branch (nested in `if
> (rs.ring.has_value())` so they only happened only if IP mapping was present.
> Now they are done directly in `if (rs.ring.has_value()` whether or not we
> have IP. Why? It looks like previously having IP was considered a different
> state from not having it, now they were merged into the same state.
>
IP mapping now has nothing to do with topology. The topology can be
re-created without ip at all. That is the whole point of the series.
Previously some other code tried to access the mapping in token metadata
in this state and failed. Now there is no such code.


> - the previous `else if` branch in `if (rs.ring.has_value())` asserting that
> we're not in `write_both_read_new` if IP is not present is gone. IIUC this
> was the assertion we were hitting in #22293. So this patch already fixes
> that issue?
Yes.

>
> Anyway each of these questions should probably correspond to a separate
> commit, with commit message explaining everything, but since I already spent
> 2 hours trying to understand this patch, and I think I more or less
> understand now, I guess it would be a waste to spend time splitting it now,
> so I only need the answers to these questions.

I really do not see how it can be done except "inlining remove_ip" of
course.
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 22, 2025, 5:52:39 AMJan 22
to Gleb Natapov, scylladb-dev@googlegroups.com
Before this series notifications would be sent only after everything is persisted in local/peers:

    co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();

    rtlogger.trace("End sync_raft_topology_nodes");

    co_return nodes_to_notify;

after your series it's no longer the case, what happens if we crash after sending notification but before persisting? Looks like these things are happening concurrently now:
                sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
            }

            if (notify_join) {
                co_await notify_joined(ip);
            }

looks fragile

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 26, 2025, 3:19:22 AMJan 26
to Kamil Braun, scylladb-dev@googlegroups.com
After restart the state will be processed again on boot and will be persisted.
The state is already persisted in the raft topology state.
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 27, 2025, 3:09:26 AMJan 27
to Gleb Natapov, scylladb-dev@googlegroups.com
Won't this confuse the driver though and make it behave weird? I imagine
something like:
- driver gets notification that node X joined
- this node crashes
- this node restarts
- before system.peers is updated to the correct state, driver reads
system.peers from this node (which IIRC happens when driver connects to
a node), which doesn't reflect that X joined

so the driver sees things happening backwards

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 27, 2025, 3:28:12 AMJan 27
to Gleb Natapov, scylladb-dev@googlegroups.com
Regarding "unneeded complication"...

the code may now be simpler in terms of lines of code or some other
shallow metric, but to me the logic is more complex now:
- joining node notifications are handled differently than leaving nodes,
so whoever wants to understand this code will ask themselves the
question, why is there a difference, spending precious brain cycles
- and then there's the question of ordering, when we send notification
about some event before persisting the effect of that event, which
spawns all the questions about fault tolerance etc. even if we're
somehow lucky and the code is fault tolerant, answering these questions
requires deep understanding and investigation, when in the previous
version, we didn't need to ask them in the first place because the
strategy was simple: first persist, then notify.

So, to me, the changes done by this patch are the actual complication

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 27, 2025, 4:05:06 AMJan 27
to Kamil Braun, scylladb-dev@googlegroups.com
Raft state is loaded early on boot before a driver has a chance to read
anything. But never mind. Running join notification as before is a trivial change. I
will send v2.
--
Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 27, 2025, 4:22:15 AMJan 27
to scylladb-dev@googlegroups.com
Since now topology does not contain ip addresses there is no need to
create topology on an ip address change. Only peers table has to be
updated. The series factors out peers table update code from
sync_raft_topology_nodes() and calls it on topology and ip address
updates. As a side effect it fixes #22293 since now topology loading
does not require IP do be present, so the assert that is triggered in
this bug is removed.

Fixes: scylladb/scylladb#22293

CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/14742/

Also in scylla-dev gleb/not-update-tm-on-addr-change-v2

v1->v2:
- send out join_node notifications after token metadata is updated

Gleb Natapov (2):
topology coordinator: split out the peer table update functionality
from raft state application
topology coordinator: do not update topology on address change

service/storage_service.hh | 3 +-
service/storage_service.cc | 256 ++++++++++++++++++++-----------------
2 files changed, 139 insertions(+), 120 deletions(-)

--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 27, 2025, 4:22:17 AMJan 27
to scylladb-dev@googlegroups.com
Raft topology state application does two things: re-creates token metadata
and updates peers table if needed. The code for both task is intermixed
now. The patch separates it into separate functions. Will be needed in
the next patch.
---
service/storage_service.hh | 1 +
service/storage_service.cc | 238 +++++++++++++++++++++----------------
2 files changed, 137 insertions(+), 102 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index d9c78c506de..cfef7174661 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -964,6 +964,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
std::vector<gms::inet_address> joined;
};

+ future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify);
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 668bf414c74..508685c073f 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -410,20 +410,17 @@ static locator::node::state to_topology_node_state(node_state ns) {
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
}

-// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
-// gossiper) to align it with the other raft topology nodes.
-future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
- nodes_to_notify_after_sync nodes_to_notify;
+future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify) {
+ if (nodes_to_notify) {
+ nodes_to_notify->joined.emplace_back(ip);
+ sys_ks_futures.push_back(raft_topology_update_ip(*target_node, ip, &nodes_to_notify));
} else {
- sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
for (const auto& id: t.left_nodes) {
- co_await process_left_node(id);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_left_node(id, host_id, ip);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
+ }
}
for (const auto& [id, rs]: t.normal_nodes) {
- co_await process_normal_node(id, rs);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_normal_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
+ }
}
for (const auto& [id, rs]: t.transition_nodes) {
- co_await process_transition_node(id, rs);
- }
- for (auto id : t.get_excluded_nodes()) {
- locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
- if (n) {
- n->set_excluded(true);
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_transition_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
}
}
}

+ for (auto id : t.get_excluded_nodes()) {
+ locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
+ if (n) {
+ n->set_excluded(true);
+ }
+ }
+
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();

rtlogger.trace("End sync_raft_topology_nodes");
--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 27, 2025, 4:22:17 AMJan 27
to scylladb-dev@googlegroups.com
Since now topology does not contain ip addresses there is no need to
create topology on an ip address change. Only peers table has to be
updated, so call a function that does peers table update only.
---
service/storage_service.hh | 2 +-
service/storage_service.cc | 68 +++++++++++++++-----------------------
2 files changed, 27 insertions(+), 43 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index cfef7174661..909f4abe4ee 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -969,7 +969,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
// Returns a structure that describes which notifications to trigger after token metadata is updated.
- future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal);
+ future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 508685c073f..366bf7d5361 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -507,7 +507,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet

// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
-future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
+future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal) {
nodes_to_notify_after_sync nodes_to_notify;

rtlogger.trace("Start sync_raft_topology_nodes");
@@ -620,44 +620,30 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t

sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());

- if (target_node) {
- raft::server_id raft_id{target_node->uuid()};
- auto ip = _address_map.get(*target_node);
- if (t.left_nodes.contains(raft_id)) {
- co_await process_left_node(raft_id, *target_node, ip);
- } else if (auto it = t.normal_nodes.find(raft_id); it != t.normal_nodes.end()) {
- co_await process_normal_node(raft_id, *target_node, ip, it->second);
- } else if ((it = t.transition_nodes.find(raft_id)) != t.transition_nodes.end()) {
- co_await process_transition_node(raft_id, *target_node, ip, it->second);
- }
- sys_ks_futures.push_back(raft_topology_update_ip(*target_node, ip, &nodes_to_notify));
- } else {
- for (const auto& id: t.left_nodes) {
- locator::host_id host_id{id.uuid()};
- auto ip = _address_map.find(host_id);
- co_await process_left_node(id, host_id, ip);
- if (ip) {
- sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
- }
+ for (const auto& id: t.left_nodes) {
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_left_node(id, host_id, ip);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
}
- for (const auto& [id, rs]: t.normal_nodes) {
- locator::host_id host_id{id.uuid()};
- auto ip = _address_map.find(host_id);
- co_await process_normal_node(id, host_id, ip, rs);
- if (ip) {
- sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
- }
+ }
+ for (const auto& [id, rs]: t.normal_nodes) {
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_normal_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
}
- for (const auto& [id, rs]: t.transition_nodes) {
- locator::host_id host_id{id.uuid()};
- auto ip = _address_map.find(host_id);
- co_await process_transition_node(id, host_id, ip, rs);
- if (ip) {
- sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
- }
+ }
+ for (const auto& [id, rs]: t.transition_nodes) {
+ locator::host_id host_id{id.uuid()};
+ auto ip = _address_map.find(host_id);
+ co_await process_transition_node(id, host_id, ip, rs);
+ if (ip) {
+ sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
}
}
-
for (auto id : t.get_excluded_nodes()) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
@@ -785,7 +771,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
}, _topology_state_machine._topology.tstate);
tmptr->set_read_new(read_new);

- auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::nullopt, std::move(prev_normal));
+ auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal));

std::optional<locator::tablet_metadata> tablets;
if (hint.tablets_hint) {
@@ -974,14 +960,12 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
- (void)futurize_invoke(ensure_alive([this, id, h = _ss._async_gate.hold()]() -> future<> {
+ (void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> {
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
- storage_service::nodes_to_notify_after_sync nodes_to_notify;
- auto lock = co_await _ss.get_token_metadata_lock();
- co_await _ss.mutate_token_metadata([this, id, &nodes_to_notify](mutable_token_metadata_ptr t) -> future<> {
- nodes_to_notify = co_await _ss.sync_raft_topology_nodes(std::move(t), id, {});
- }, storage_service::acquire_merge_lock::no);
+ // Set notify_join to true since here we detected address change and drivers have to be notified
+ nodes_to_notify_after_sync nodes_to_notify;
+ co_await _ss.raft_topology_update_ip(id, endpoint, &nodes_to_notify);
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
}));
}
--
2.47.1

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 27, 2025, 9:11:30 AMJan 27
to Gleb Natapov, scylladb-dev@googlegroups.com
Please send as GitHub PR, 2025.1 is branched and we'll need to backport this
Reply all
Reply to author
Forward
0 new messages