Gossiper manages address map now, so load peers table into the gossiper
on reboot to be able to map ids to ips as early as possible.
---
service/storage_service.hh | 2 +-
service/storage_service.cc | 92 ++++++++++++++++++++++----------------
2 files changed, 54 insertions(+), 40 deletions(-)
diff --git a/service/storage_service.hh b/service/storage_service.hh
index 2b657985285..ed5d102ff58 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -532,7 +532,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
private:
- db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint);
+ std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint);
// return an engaged value iff app_state_map has changes to the peer info
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 7468d91b048..6725d21997e 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -511,18 +511,20 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
// add one.
const auto& host_id_to_ip_map = *(co_await get_host_id_to_ip_map());
- // Some state that is used to fill in 'peeers' table is still propagated over gossiper.
+ // Some state that is used to fill in 'peers' table is still propagated over gossiper.
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
auto info = get_peer_info_for_update(*ip);
- // And then amend with the info from raft
- info.tokens = rs.ring.value().tokens;
- info.data_center = rs.datacenter;
- info.rack = rs.rack;
- info.release_version = rs.release_version;
- info.supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
- sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, info));
+ if (info) {
+ // And then amend with the info from raft
+ info->tokens = rs.ring.value().tokens;
+ info->data_center = rs.datacenter;
+ info->rack = rs.rack;
+ info->release_version = rs.release_version;
+ info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
+ sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, *info));
+ }
if (!prev_normal.contains(id)) {
nodes_to_notify.joined.push_back(*ip);
}
@@ -2550,7 +2552,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} endpoint_to_remove={} owned_tokens={}", endpoint, is_normal_token_owner, endpoints_to_remove.contains(endpoint), owned_tokens);
if (!is_me(endpoint) && !owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
try {
- auto info = get_peer_info_for_update(endpoint);
+ auto info = get_peer_info_for_update(endpoint).value();
info.tokens = std::move(owned_tokens);
co_await _sys_ks.local().update_peer_info(endpoint, host_id, info);
} catch (...) {
@@ -2761,17 +2763,16 @@ future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_s
return make_ready_future();
}
-db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint) {
+std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint) {
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
return db::system_keyspace::peer_info{};
}
auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
- if (!info) {
+ if (!info && !raft_topology_change_enabled()) {
on_internal_error_noexcept(slogger, seastar::format("get_peer_info_for_update({}): application state has no peer info: {}", endpoint, ep_state->get_application_state_map()));
- return db::system_keyspace::peer_info{};
}
- return *info;
+ return info;
}
std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) {
@@ -3007,35 +3008,48 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
// We must allow restarts of zero-token nodes in the gossip-based topology due to the recovery mode.
}
- if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) {
-
slogger.info("Loading persisted ring state");
-
- auto tmlock = co_await get_token_metadata_lock();
- auto tmptr = co_await get_mutable_token_metadata_ptr();
- for (auto& [host_id, st] : loaded_endpoints) {
- if (st.endpoint == get_broadcast_address()) {
- // entry has been mistakenly added, delete it
- slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint);
- co_await _sys_ks.local().remove_endpoint(st.endpoint);
- } else {
- if (host_id == my_host_id()) {
- on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
- }
- if (!st.opt_dc_rack) {
- st.opt_dc_rack = locator::endpoint_dc_rack::default_location;
- slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
+ if (!raft_topology_change_enabled()) {
+ if (_db.local().get_config().load_ring_state()) {
+
slogger.info("Loading persisted ring state");
+
+ auto tmlock = co_await get_token_metadata_lock();
+ auto tmptr = co_await get_mutable_token_metadata_ptr();
+ for (auto& [host_id, st] : loaded_endpoints) {
+ if (st.endpoint == get_broadcast_address()) {
+ // entry has been mistakenly added, delete it
+ slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint);
+ co_await _sys_ks.local().remove_endpoint(st.endpoint);
+ } else {
+ if (host_id == my_host_id()) {
+ on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
+ }
+ if (!st.opt_dc_rack) {
+ st.opt_dc_rack = locator::endpoint_dc_rack::default_location;
+ slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
+ }
+ const auto& dc_rack = *st.opt_dc_rack;
+ slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens);
+ tmptr->update_topology(host_id, dc_rack, locator::node::state::normal);
+ co_await tmptr->update_normal_tokens(st.tokens, host_id);
+ tmptr->update_host_id(host_id, st.endpoint);
+ // gossiping hasn't started yet
+ // so no need to lock the endpoint
+ co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
- const auto& dc_rack = *st.opt_dc_rack;
- slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens);
- tmptr->update_topology(host_id, dc_rack, locator::node::state::normal);
- co_await tmptr->update_normal_tokens(st.tokens, host_id);
- tmptr->update_host_id(host_id, st.endpoint);
- // gossiping hasn't started yet
- // so no need to lock the endpoint
- co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
+ co_await replicate_to_all_cores(std::move(tmptr));
}
- co_await replicate_to_all_cores(std::move(tmptr));
+ } else {
+
slogger.info("Loading persisted peers into the gossiper");
+ // If topology coordinator is enabled only load peers into the gossiper (since it is were ID to IP maopping is managed)
+ // No need to update topology.
+ co_await coroutine::parallel_for_each(loaded_endpoints, [&] (auto& e) -> future<> {
+ auto& [host_id, st] = e;
+ if (host_id == my_host_id()) {
+ on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
+ }
+ co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
+ });
}
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
--
2.47.1