This patch changes gossiper to index nodes by host ids instead of ips.
The main data structure that changes is _endpoint_state_map, but this
results in a lot of changes since everything that uses the map directly
or indirectly has to be changed. The big victim of this outside of the
gossiper itself is topology over gossiper code. It works on IPs and
assumes the gossiper does the same and both need to be changed together.
Changes to other subsystems are much smaller since they already mostly
work on host ids anyway.
---
scylla-gdb.py | 2 +-
gms/gossiper.hh | 74 +++---
service/storage_service.hh | 10 +-
alternator/server.cc | 7 +-
api/failure_detector.cc | 10 +-
api/gossiper.cc | 4 +-
api/storage_service.cc | 2 +-
cdc/generation.cc | 17 +-
db/virtual_tables.cc | 8 +-
gms/gossiper.cc | 386 +++++++++++++++-----------------
locator/util.cc | 2 +-
repair/repair.cc | 5 +-
service/migration_manager.cc | 2 +-
service/storage_service.cc | 162 +++++++-------
service/topology_coordinator.cc | 4 +-
transport/event_notifier.cc | 2 +-
16 files changed, 332 insertions(+), 365 deletions(-)
diff --git a/scylla-gdb.py b/scylla-gdb.py
index c28f4006d5b..a59783bb3c9 100755
--- a/scylla-gdb.py
+++ b/scylla-gdb.py
@@ -4396,7 +4396,7 @@ class scylla_gms(gdb.Command):
state = state_ptr.get().dereference()
except Exception:
pass
- ip = ip_to_str(int(get_ip(endpoint)), byteorder=sys.byteorder)
+ ip = ip_to_str(int(get_ip(state['_ip'])), byteorder=sys.byteorder)
gdb.write('%s: (gms::endpoint_state*) %s (%s)\n' % (ip, state.address, state['_heart_beat_state']))
try:
app_states_map = std_unordered_map(state['_application_state'])
diff --git a/gms/gossiper.hh b/gms/gossiper.hh
index 7773000afd3..deccd54eab2 100644
--- a/gms/gossiper.hh
+++ b/gms/gossiper.hh
@@ -106,7 +106,7 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
future<> handle_ack_msg(locator::host_id from, gossip_digest_ack ack_msg);
future<> handle_ack2_msg(locator::host_id from, gossip_digest_ack2 msg);
future<> handle_echo_msg(locator::host_id id, seastar::rpc::opt_time_point, std::optional<int64_t> generation_number_opt, bool notify_up);
- future<> handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt);
+ future<> handle_shutdown_msg(locator::host_id from, std::optional<int64_t> generation_number_opt);
future<> do_send_ack_msg(locator::host_id from, gossip_digest_syn syn_msg);
future<> do_send_ack2_msg(locator::host_id from, utils::chunked_vector<gossip_digest> ack_msg_digest);
future<gossip_get_endpoint_states_response> handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request);
@@ -162,32 +162,32 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
endpoint_lock_entry() noexcept;
};
- using endpoint_locks_map = utils::loading_shared_values<inet_address, endpoint_lock_entry>;
+ using endpoint_locks_map = utils::loading_shared_values<locator::host_id, endpoint_lock_entry>;
class endpoint_permit {
endpoint_locks_map::entry_ptr _ptr;
permit_id _permit_id;
- inet_address _addr;
+ locator::host_id _addr;
seastar::compat::source_location _caller;
public:
- endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, inet_address addr, seastar::compat::source_location caller) noexcept;
+ endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, locator::host_id addr, seastar::compat::source_location caller) noexcept;
endpoint_permit(endpoint_permit&&) noexcept;
~endpoint_permit();
bool release() noexcept;
const permit_id& id() const noexcept { return _permit_id; }
};
// Must be called on shard 0
- future<endpoint_permit> lock_endpoint(inet_address, permit_id pid, seastar::compat::source_location l = seastar::compat::source_location::current());
+ future<endpoint_permit> lock_endpoint(locator::host_id, permit_id pid, seastar::compat::source_location l = seastar::compat::source_location::current());
private:
- void permit_internal_error(const inet_address& addr, permit_id pid);
- void verify_permit(const inet_address& addr, permit_id pid) {
+ void permit_internal_error(const locator::host_id& addr, permit_id pid);
+ void verify_permit(const locator::host_id& addr, permit_id pid) {
if (!pid) {
permit_internal_error(addr, pid);
}
}
/* map where key is the endpoint and value is the state associated with the endpoint */
- std::unordered_map<inet_address, endpoint_state_ptr> _endpoint_state_map;
+ std::unordered_map<locator::host_id, endpoint_state_ptr> _endpoint_state_map;
// Used for serializing changes to _endpoint_state_map and running of associated change listeners.
endpoint_locks_map _endpoint_locks;
@@ -331,7 +331,7 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
/**
* @param endpoint end point that is convicted.
*/
- future<> convict(inet_address endpoint);
+ future<> convict(locator::host_id endpoint);
/**
* Removes the endpoint from gossip completely
@@ -340,12 +340,12 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
*
* Must be called under lock_endpoint.
*/
- future<> evict_from_membership(inet_address endpoint, permit_id);
+ future<> evict_from_membership(locator::host_id endpoint, permit_id);
public:
/**
* Removes the endpoint from Gossip but retains endpoint state
*/
- future<> remove_endpoint(inet_address endpoint, permit_id);
+ future<> remove_endpoint(locator::host_id endpoint, permit_id);
// Returns true if an endpoint was removed
future<bool> force_remove_endpoint(inet_address endpoint, locator::host_id id, permit_id);
private:
@@ -380,7 +380,7 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
* @param endpoint
* @param host_id
*/
- future<> advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id);
+ future<> advertise_token_removed(locator::host_id host_id, permit_id);
/**
* Do not call this method unless you know what you are doing.
@@ -393,12 +393,12 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
future<> assassinate_endpoint(sstring address);
public:
- future<generation_type> get_current_generation_number(inet_address endpoint) const;
- future<version_type> get_current_heart_beat_version(inet_address endpoint) const;
+ future<generation_type> get_current_generation_number(locator::host_id endpoint) const;
+ future<version_type> get_current_heart_beat_version(locator::host_id endpoint) const;
bool is_gossip_only_member(locator::host_id endpoint) const;
bool is_safe_for_bootstrap(inet_address endpoint) const;
- bool is_safe_for_restart(inet_address endpoint, locator::host_id host_id) const;
+ bool is_safe_for_restart(locator::host_id host_id) const;
private:
/**
* Returns true if the chosen target was also a seed. False otherwise
@@ -426,23 +426,21 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
// Otherwise, returns a null ptr.
// The endpoint_state is immutable (except for its update_timestamp), guaranteed not to change while
// the endpoint_state_ptr is held.
- endpoint_state_ptr get_endpoint_state_ptr(inet_address ep) const noexcept;
endpoint_state_ptr get_endpoint_state_ptr(locator::host_id ep) const noexcept;
// Return this node's endpoint_state_ptr
endpoint_state_ptr get_this_endpoint_state_ptr() const noexcept {
- return get_endpoint_state_ptr(get_broadcast_address());
+ return get_endpoint_state_ptr(my_host_id());
}
- const versioned_value* get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept;
const versioned_value* get_application_state_ptr(locator::host_id id, application_state appstate) const noexcept;
- sstring get_application_state_value(inet_address endpoint, application_state appstate) const;
+ sstring get_application_state_value(locator::host_id endpoint, application_state appstate) const;
// removes ALL endpoint states; should only be called after shadow gossip.
// Must be called on shard 0
future<> reset_endpoint_state_map();
- std::vector<inet_address> get_endpoints() const;
+ std::vector<locator::host_id> get_endpoints() const;
size_t num_endpoints() const noexcept {
return _endpoint_state_map.size();
@@ -450,8 +448,8 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
// Calls func for each endpoint_state.
// Called function must not yield
- void for_each_endpoint_state(std::function<void(const inet_address&, const endpoint_state&)> func) const {
- for_each_endpoint_state_until([func = std::move(func)] (const inet_address& node, const endpoint_state& eps) {
+ void for_each_endpoint_state(std::function<void(const locator::host_id&, const endpoint_state&)> func) const {
+ for_each_endpoint_state_until([func = std::move(func)] (const locator::host_id& node, const endpoint_state& eps) {
func(node, eps);
return stop_iteration::no;
});
@@ -460,45 +458,45 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
// Calls func for each endpoint_state until it returns stop_iteration::yes
// Returns stop_iteration::yes iff `func` returns stop_iteration::yes.
// Called function must not yield
- stop_iteration for_each_endpoint_state_until(std::function<stop_iteration(const inet_address&, const endpoint_state&)>) const;
+ stop_iteration for_each_endpoint_state_until(std::function<stop_iteration(const locator::host_id&, const endpoint_state&)>) const;
locator::host_id get_host_id(inet_address endpoint) const;
std::optional<locator::host_id> get_host_id_opt(inet_address endpoint) const;
std::set<gms::inet_address> get_nodes_with_host_id(locator::host_id host_id) const;
- std::optional<endpoint_state> get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) const;
+ std::optional<endpoint_state> get_state_for_version_bigger_than(locator::host_id for_endpoint, version_type version) const;
/**
* determine which endpoint started up earlier
*/
- std::strong_ordering compare_endpoint_startup(inet_address addr1, inet_address addr2) const;
+ std::strong_ordering compare_endpoint_startup(locator::host_id addr1, locator::host_id addr2) const;
/**
* Return the rpc address associated with an endpoint as a string.
* @param endpoint The endpoint to get rpc address for
* @return the rpc address
*/
- sstring get_rpc_address(const inet_address& endpoint) const;
+ sstring get_rpc_address(const locator::host_id& endpoint) const;
- future<> real_mark_alive(inet_address addr);
+ future<> real_mark_alive(locator::host_id addr);
private:
endpoint_state& my_endpoint_state();
// Use with care, as the endpoint_state_ptr in the endpoint_state_map is considered
// immutable, with one exception - the update_timestamp.
void update_timestamp(const endpoint_state_ptr& eps) noexcept;
- const endpoint_state& get_endpoint_state(inet_address ep) const;
+ const endpoint_state& get_endpoint_state(locator::host_id ep) const;
void update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map);
- void mark_alive(inet_address addr);
+ void mark_alive(endpoint_state_ptr node);
// Must be called under lock_endpoint.
- future<> mark_dead(inet_address addr, endpoint_state_ptr local_state, permit_id);
+ future<> mark_dead(locator::host_id addr, endpoint_state_ptr local_state, permit_id);
// Must be called under lock_endpoint.
- future<> mark_as_shutdown(const inet_address& endpoint, permit_id);
+ future<> mark_as_shutdown(const locator::host_id& endpoint, permit_id);
/**
* This method is called whenever there is a "big" change in ep state (a generation change for a known node).
@@ -532,7 +530,7 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
future<> apply_state_locally(std::map<inet_address, endpoint_state> map);
private:
- future<> do_apply_state_locally(gms::inet_address node, endpoint_state remote_state, bool shadow_round);
+ future<> do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round);
future<> apply_state_locally_in_shadow_round(std::unordered_map<inet_address, endpoint_state> map);
// Must be called under lock_endpoint.
@@ -645,20 +643,20 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
static clk::time_point compute_expire_time();
public:
bool is_seed(const inet_address& endpoint) const;
- bool is_shutdown(const inet_address& endpoint) const;
+ bool is_shutdown(const locator::host_id& endpoint) const;
bool is_shutdown(const endpoint_state& eps) const;
- bool is_normal(const inet_address& endpoint) const;
- bool is_left(const inet_address& endpoint) const;
+ bool is_normal(const locator::host_id& endpoint) const;
+ bool is_left(const locator::host_id& endpoint) const;
// Check if a node is in NORMAL or SHUTDOWN status which means the node is
// part of the token ring from the gossip point of view and operates in
// normal status or was in normal status but is shutdown.
- bool is_normal_ring_member(const inet_address& endpoint) const;
- bool is_cql_ready(const inet_address& endpoint) const;
+ bool is_normal_ring_member(const locator::host_id& endpoint) const;
+ bool is_cql_ready(const locator::host_id& endpoint) const;
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
void force_newer_generation();
public:
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
- std::string_view get_gossip_status(const inet_address& endpoint) const noexcept;
+ std::string_view get_gossip_status(const locator::host_id& endpoint) const noexcept;
public:
future<> wait_for_gossip_to_settle() const;
future<> wait_for_range_setup() const;
diff --git a/service/storage_service.hh b/service/storage_service.hh
index 75837838886..e348f82ac32 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -529,11 +529,11 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
private:
- std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint);
+ std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint);
// return an engaged value iff app_state_map has changes to the peer info
- std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);
+ std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint, const gms::application_state_map& app_state_map);
- std::unordered_set<token> get_tokens_for(inet_address endpoint);
+ std::unordered_set<token> get_tokens_for(locator::host_id endpoint);
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(const gms::endpoint_state& ep_state);
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(locator::host_id endpoint);
private:
@@ -792,8 +792,8 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
future<> update_fence_version(token_metadata::version_t version);
private:
- std::unordered_set<gms::inet_address> _normal_state_handled_on_boot;
- bool is_normal_state_handled_on_boot(gms::inet_address);
+ std::unordered_set<locator::host_id> _normal_state_handled_on_boot;
+ bool is_normal_state_handled_on_boot(locator::host_id);
future<> wait_for_normal_state_handled_on_boot();
friend class group0_state_machine;
diff --git a/alternator/server.cc b/alternator/server.cc
index a9eb1f72494..3de72f61697 100644
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -228,9 +228,8 @@ class local_nodelist_handler : public gated_handler {
// If the rack does not exist, we return an empty list - not an error.
sstring query_rack = req->get_query_param("rack");
for (auto& id : local_dc_nodes) {
- auto ip = _gossiper.get_address_map().get(id);
if (!query_rack.empty()) {
- auto rack = _gossiper.get_application_state_value(ip, gms::application_state::RACK);
+ auto rack = _gossiper.get_application_state_value(id, gms::application_state::RACK);
if (rack != query_rack) {
continue;
}
@@ -238,10 +237,10 @@ class local_nodelist_handler : public gated_handler {
// Note that it's not enough for the node to be is_alive() - a
// node joining the cluster is also "alive" but not responsive to
// requests. We alive *and* normal. See #19694, #21538.
- if (_gossiper.is_alive(id) && _gossiper.is_normal(ip)) {
+ if (_gossiper.is_alive(id) && _gossiper.is_normal(id)) {
// Use the gossiped broadcast_rpc_address if available instead
// of the internal IP address "ip". See discussion in #18711.
- rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));
+ rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(id)));
}
}
rep->set_status(reply::status_type::ok);
diff --git a/api/failure_detector.cc b/api/failure_detector.cc
index 00b4d4030a5..2db89003dc7 100644
--- a/api/failure_detector.cc
+++ b/api/failure_detector.cc
@@ -22,9 +22,9 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
return g.container().invoke_on(0, [] (gms::gossiper& g) {
std::vector<fd::endpoint_state> res;
res.reserve(g.num_endpoints());
- g.for_each_endpoint_state([&] (const gms::inet_address& addr, const gms::endpoint_state& eps) {
+ g.for_each_endpoint_state([&] (const locator::host_id& addr, const gms::endpoint_state& eps) {
fd::endpoint_state val;
- val.addrs = fmt::to_string(addr);
+ val.addrs = fmt::to_string(eps.get_ip());
val.is_alive = g.is_alive(eps.get_host_id());
val.generation = eps.get_heart_beat_state().get_generation().value();
val.version = eps.get_heart_beat_state().get_heart_beat_version().value();
@@ -65,8 +65,8 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
fd::get_simple_states.set(r, [&g] (std::unique_ptr<request> req) {
return g.container().invoke_on(0, [] (gms::gossiper& g) {
std::map<sstring, sstring> nodes_status;
- g.for_each_endpoint_state([&] (const gms::inet_address& node, const gms::endpoint_state& es) {
- nodes_status.emplace(fmt::to_string(node), g.is_alive(es.get_host_id()) ? "UP" : "DOWN");
+ g.for_each_endpoint_state([&] (const locator::host_id& node, const gms::endpoint_state& es) {
+ nodes_status.emplace(fmt::to_string(es.get_ip()), g.is_alive(es.get_host_id()) ? "UP" : "DOWN");
});
return make_ready_future<json::json_return_type>(map_to_key_value<fd::mapper>(nodes_status));
});
@@ -81,7 +81,7 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
fd::get_endpoint_state.set(r, [&g] (std::unique_ptr<request> req) {
return g.container().invoke_on(0, [req = std::move(req)] (gms::gossiper& g) {
- auto state = g.get_endpoint_state_ptr(gms::inet_address(req->get_path_param("addr")));
+ auto state = g.get_endpoint_state_ptr(g.get_host_id(gms::inet_address(req->get_path_param("addr"))));
if (!state) {
return make_ready_future<json::json_return_type>(format("unknown endpoint {}", req->get_path_param("addr")));
}
diff --git a/api/gossiper.cc b/api/gossiper.cc
index 2df0e05cf45..5f5d03066cb 100644
--- a/api/gossiper.cc
+++ b/api/gossiper.cc
@@ -40,14 +40,14 @@ void set_gossiper(http_context& ctx, routes& r, gms::gossiper& g) {
httpd::gossiper_json::get_current_generation_number.set(r, [&g] (std::unique_ptr<http::request> req) {
gms::inet_address ep(req->get_path_param("addr"));
- return g.get_current_generation_number(ep).then([] (gms::generation_type res) {
+ return g.get_current_generation_number(g.get_host_id(ep)).then([] (gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
httpd::gossiper_json::get_current_heart_beat_version.set(r, [&g] (std::unique_ptr<http::request> req) {
gms::inet_address ep(req->get_path_param("addr"));
- return g.get_current_heart_beat_version(ep).then([] (gms::version_type res) {
+ return g.get_current_heart_beat_version(g.get_host_id(ep)).then([] (gms::version_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
diff --git a/api/storage_service.cc b/api/storage_service.cc
index ff1532ca9cb..3c9bb7acf48 100644
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -699,7 +699,7 @@ rest_get_load(http_context& ctx, std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_get_current_generation_number(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
- auto ep = ss.local().get_token_metadata().get_topology().my_address();
+ auto ep = ss.local().get_token_metadata().get_topology().my_host_id();
return ss.local().gossiper().get_current_generation_number(ep).then([](gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
diff --git a/cdc/generation.cc b/cdc/generation.cc
index 518da60e7c4..ffcdccdd78c 100644
--- a/cdc/generation.cc
+++ b/cdc/generation.cc
@@ -39,12 +39,12 @@
extern logging::logger cdc_log;
-static int get_shard_count(const gms::inet_address& endpoint, const gms::gossiper& g) {
+static int get_shard_count(const locator::host_id& endpoint, const gms::gossiper& g) {
auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::SHARD_COUNT);
return ep_state ? std::stoi(ep_state->value()) : -1;
}
-static unsigned get_sharding_ignore_msb(const gms::inet_address& endpoint, const gms::gossiper& g) {
+static unsigned get_sharding_ignore_msb(const locator::host_id& endpoint, const gms::gossiper& g) {
auto ep_state = g.get_application_state_ptr(endpoint, gms::application_state::IGNORE_MSB_BITS);
return ep_state ? std::stoi(ep_state->value()) : 0;
}
@@ -198,7 +198,7 @@ static std::vector<stream_id> create_stream_ids(
}
bool should_propose_first_generation(const locator::host_id& my_host_id, const gms::gossiper& g) {
- return g.for_each_endpoint_state_until([&] (const gms::inet_address&, const gms::endpoint_state& eps) {
+ return g.for_each_endpoint_state_until([&] (const locator::host_id&, const gms::endpoint_state& eps) {
return stop_iteration(my_host_id < eps.get_host_id());
}) == stop_iteration::no;
}
@@ -402,9 +402,8 @@ future<cdc::generation_id> generation_service::legacy_make_new_generation(const
throw std::runtime_error(
format("Can't find endpoint for token {}", end));
}
- const auto ep = _gossiper.get_address_map().get(*endpoint);
- auto sc = get_shard_count(ep, _gossiper);
- return {sc > 0 ? sc : 1, get_sharding_ignore_msb(ep, _gossiper)};
+ auto sc = get_shard_count(*endpoint, _gossiper);
+ return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)};
}
};
@@ -463,7 +462,7 @@ future<cdc::generation_id> generation_service::legacy_make_new_generation(const
* but if the cluster already supports CDC, then every newly joining node will propose a new CDC generation,
* which means it will gossip the generation's timestamp.
*/
-static std::optional<cdc::generation_id> get_generation_id_for(const gms::inet_address& endpoint, const gms::endpoint_state& eps) {
+static std::optional<cdc::generation_id> get_generation_id_for(const locator::host_id& endpoint, const gms::endpoint_state& eps) {
const auto* gen_id_ptr = eps.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID);
if (!gen_id_ptr) {
return std::nullopt;
@@ -867,7 +866,7 @@ future<> generation_service::check_and_repair_cdc_streams() {
}
std::optional<cdc::generation_id> latest = _gen_id;
- _gossiper.for_each_endpoint_state([&] (const gms::inet_address& addr, const gms::endpoint_state& state) {
+ _gossiper.for_each_endpoint_state([&] (const locator::host_id& addr, const gms::endpoint_state& state) {
if (_gossiper.is_left(addr)) {
cdc_log.info("check_and_repair_cdc_streams ignored node {} because it is in LEFT state", addr);
return;
@@ -1066,7 +1065,7 @@ future<> generation_service::legacy_scan_cdc_generations() {
assert_shard_zero(__PRETTY_FUNCTION__);
std::optional<cdc::generation_id> latest;
- _gossiper.for_each_endpoint_state([&] (const gms::inet_address& node, const gms::endpoint_state& eps) {
+ _gossiper.for_each_endpoint_state([&] (const locator::host_id& node, const gms::endpoint_state& eps) {
auto gen_id = get_generation_id_for(node, eps);
if (!latest || (gen_id && get_ts(*gen_id) > get_ts(*latest))) {
latest = gen_id;
diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc
index d5c8a0224c7..c91976184e0 100644
--- a/db/virtual_tables.cc
+++ b/db/virtual_tables.cc
@@ -75,9 +75,9 @@ class cluster_status_table : public memtable_filling_virtual_table {
std::vector<frozen_mutation> muts;
muts.reserve(gossiper.num_endpoints());
- gossiper.for_each_endpoint_state([&] (const gms::inet_address& endpoint, const gms::endpoint_state& eps) {
+ gossiper.for_each_endpoint_state([&] (const locator::host_id& endpoint, const gms::endpoint_state& eps) {
static thread_local auto s = build_schema();
- mutation m(s, partition_key::from_single_value(*s, data_value(endpoint).serialize_nonnull()));
+ mutation m(s, partition_key::from_single_value(*s, data_value(eps.get_ip()).serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
auto hostid = eps.get_host_id();
@@ -99,8 +99,8 @@ class cluster_status_table : public memtable_filling_virtual_table {
set_cell(cr, "dc", dc);
}
- if (ownership.contains(endpoint)) {
- set_cell(cr, "owns", ownership[endpoint]);
+ if (ownership.contains(eps.get_ip())) {
+ set_cell(cr, "owns", ownership[eps.get_ip()]);
}
set_cell(cr, "tokens", int32_t(tm.get_tokens(hostid).size()));
diff --git a/gms/gossiper.cc b/gms/gossiper.cc
index 87926ece195..f1fe42409fe 100644
--- a/gms/gossiper.cc
+++ b/gms/gossiper.cc
@@ -22,6 +22,7 @@
#include "message/messaging_service.hh"
#include "utils/log.hh"
#include "db/system_keyspace.hh"
+#include <algorithm>
#include <fmt/chrono.h>
#include <fmt/ranges.h>
#include <ranges>
@@ -101,7 +102,7 @@ gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm,
fat_client_timeout = quarantine_delay() / 2;
// Register this instance with JMX
namespace sm = seastar::metrics;
- auto ep = get_broadcast_address();
+ auto ep = my_host_id();
_metrics.add_group("gossip", {
sm::make_counter("heart_beat",
[ep, this] {
@@ -148,7 +149,9 @@ void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) cons
utils::chunked_vector<gossip_digest> diff_digests;
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
- auto ep_state = get_endpoint_state_ptr(ep);
+ locator::host_id id = get_host_id_opt(ep).value_or(locator::host_id{});
+
+ auto ep_state = get_endpoint_state_ptr(id);
version_type version = ep_state ? get_max_endpoint_state_version(*ep_state) : version_type();
int32_t diff_version = ::abs((version - g_digest.get_max_version()).value());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version)));
@@ -367,7 +370,8 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
std::map<inet_address, endpoint_state> delta_ep_state_map;
for (auto g_digest : ack_msg_digest) {
inet_address addr = g_digest.get_endpoint();
- const auto es = get_endpoint_state_ptr(addr);
+ locator::host_id id = get_host_id_opt(addr).value_or(locator::host_id{});
+ const auto es = get_endpoint_state_ptr(id);
if (!es || es->get_heart_beat_state().get_generation() < g_digest.get_generation()) {
continue;
}
@@ -378,7 +382,7 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
const auto version = es->get_heart_beat_state().get_generation() > g_digest.get_generation()
? version_type(0)
: g_digest.get_max_version();
- auto local_ep_state_ptr = get_state_for_version_bigger_than(addr, version);
+ auto local_ep_state_ptr = get_state_for_version_bigger_than(id, version);
if (local_ep_state_ptr) {
delta_ep_state_map.emplace(addr, *local_ep_state_ptr);
}
@@ -462,7 +466,7 @@ future<> gossiper::handle_echo_msg(locator::host_id from_hid, seastar::rpc::opt_
}
}
-future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt) {
+future<> gossiper::handle_shutdown_msg(locator::host_id from, std::optional<int64_t> generation_number_opt) {
if (!is_enabled()) {
logger.debug("Ignoring shutdown message from {} because gossip is disabled", from);
co_return;
@@ -503,7 +507,7 @@ gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request requ
state_wanted.get_application_state_map().emplace(app);
}
}
- map.emplace(node, std::move(state_wanted));
+ map.emplace(state->get_ip(), std::move(state_wanted));
}
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
}
@@ -541,8 +545,9 @@ void gossiper::init_messaging_service_handler() {
return handle_echo_msg(from_hid, timeout, generation_number_opt, notify_up_opt.value_or(false));
});
ser::gossip_rpc_verbs::register_gossip_shutdown(&_messaging, [this] (const rpc::client_info& cinfo, inet_address from, rpc::optional<int64_t> generation_number_opt) {
- return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
- return gossiper.handle_shutdown_msg(from, generation_number_opt);
+ auto from_hid = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
+ return background_msg("GOSSIP_SHUTDOWN", [from_hid, generation_number_opt] (gms::gossiper& gossiper) {
+ return gossiper.handle_shutdown_msg(from_hid, generation_number_opt);
});
});
ser::gossip_rpc_verbs::register_gossip_get_endpoint_states(&_messaging, [this] (const rpc::client_info& cinfo, rpc::opt_time_point, gossip_get_endpoint_states_request request) {
@@ -578,22 +583,11 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<T> epset) {
}
-future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state remote_state, bool shadow_round) {
+future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round) {
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = co_await lock_endpoint(node, null_permit_id);
auto es = get_endpoint_state_ptr(node);
- if (!es && _topo_sm) {
- // Even if there is no endpoint for the given IP the message can still belong to existing endpoint that
- // was restarted with different IP, so lets try to locate the endpoint by host id as well. Do it in raft
- // topology mode only to not have impact on gossiper mode.
- auto hid = remote_state.get_host_id();
- for (auto&& s : _endpoint_state_map) {
- if (s.second->get_host_id() == hid) {
- es = s.second;
- }
- }
- }
if (es) {
endpoint_state local_state = *es;
auto local_generation = local_state.get_heart_beat_state().get_generation();
@@ -617,8 +611,10 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
} else {
logger.debug("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
}
- if (!is_alive(es->get_host_id()) && !is_dead_state(get_endpoint_state(node)) && !shadow_round) { // unless of course, it was dead
- mark_alive(node);
+ // Re-rake after apply_new_states
+ es = get_endpoint_state_ptr(node);
+ if (!is_alive(es->get_host_id()) && !is_dead_state(*es) && !shadow_round) { // unless of course, it was dead
+ mark_alive(es);
}
} else {
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
@@ -632,7 +628,8 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
future<> gossiper::apply_state_locally_in_shadow_round(std::unordered_map<inet_address, endpoint_state> map) {
for (auto& [node, remote_state] : map) {
remote_state.set_ip(node);
- co_await do_apply_state_locally(node, std::move(remote_state), true);
+ auto id = remote_state.get_host_id();
+ co_await do_apply_state_locally(id, std::move(remote_state), true);
}
}
@@ -670,8 +667,8 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
return make_ready_future<>();
}
}
- return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, ep, state = std::move(it->second)] () mutable {
- return do_apply_state_locally(ep, std::move(state), false);
+ return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, hid, state = std::move(it->second)] () mutable {
+ return do_apply_state_locally(hid, std::move(state), false);
});
});
@@ -681,7 +678,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
future<bool> gossiper::force_remove_endpoint(inet_address endpoint, locator::host_id id, permit_id pid) {
return container().invoke_on(0, [this, endpoint, pid, id] (auto& gossiper) mutable -> future<bool> {
- auto permit = co_await gossiper.lock_endpoint(endpoint, pid);
+ auto permit = co_await gossiper.lock_endpoint(id, pid);
pid =
permit.id();
try {
if (gossiper.get_host_id(endpoint) != id) {
@@ -690,8 +687,8 @@ future<bool> gossiper::force_remove_endpoint(inet_address endpoint, locator::hos
if (endpoint == get_broadcast_address()) {
throw std::runtime_error(format("Can not force remove node {} itself", endpoint));
}
- co_await gossiper.remove_endpoint(endpoint, pid);
- co_await gossiper.evict_from_membership(endpoint, pid);
+ co_await gossiper.remove_endpoint(id, pid);
+ co_await gossiper.evict_from_membership(id, pid);
logger.info("Finished to force remove node {}", endpoint);
co_return true;
} catch (...) {
@@ -701,53 +698,50 @@ future<bool> gossiper::force_remove_endpoint(inet_address endpoint, locator::hos
});
}
-future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) {
+future<> gossiper::remove_endpoint(locator::host_id endpoint, permit_id pid) {
auto permit = co_await lock_endpoint(endpoint, pid);
pid =
permit.id();
auto state = get_endpoint_state_ptr(endpoint);
+ auto ip = state ? state->get_ip() : inet_address{};
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
try {
- co_await _subscribers.for_each([endpoint, state, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_remove(endpoint, state ? state->get_host_id() : locator::host_id{}, pid);
+ co_await _subscribers.for_each([endpoint, ip, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
+ return subscriber->on_remove(ip, endpoint, pid);
});
} catch (...) {
logger.warn("Fail to call on_remove callback: {}", std::current_exception());
}
- if(_seeds.contains(endpoint)) {
- build_seeds_list();
- _seeds.erase(endpoint);
-
logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds);
- }
-
if (!state) {
logger.warn("There is no state for the removed IP {}", endpoint);
co_return;
}
+ if(_seeds.contains(ip)) {
+ build_seeds_list();
+ _seeds.erase(ip);
+
logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds);
+ }
+
auto host_id = state->get_host_id();
bool was_alive = false;
- if (_address_map.find(host_id) == endpoint) {
- // During IP address change we may have a situation where we work on old address
- // but there is a new address for the same host id, so no need to mark host id as down
- co_await mutate_live_and_unreachable_endpoints([host_id, &was_alive] (live_and_unreachable_endpoints& data) {
- was_alive = data.live.erase(host_id);
- data.unreachable.erase(host_id);
- });
- _syn_handlers.erase(host_id);
- _ack_handlers.erase(host_id);
- }
+ co_await mutate_live_and_unreachable_endpoints([host_id, &was_alive] (live_and_unreachable_endpoints& data) {
+ was_alive = data.live.erase(host_id);
+ data.unreachable.erase(host_id);
+ });
+ _syn_handlers.erase(host_id);
+ _ack_handlers.erase(host_id);
quarantine_endpoint(host_id);
logger.info("Removed endpoint {}", endpoint);
if (was_alive) {
try {
logger.info("InetAddress {}/{} is now DOWN, status = {}", state->get_host_id(), endpoint, get_gossip_status(*state));
- co_await do_on_dead_notifications(endpoint, std::move(state), pid);
+ co_await do_on_dead_notifications(ip, std::move(state), pid);
} catch (...) {
logger.warn("Fail to call on_dead callback: {}", std::current_exception());
}
@@ -759,20 +753,22 @@ future<> gossiper::do_status_check() {
auto now = this->now();
- for (const auto& endpoint : get_endpoints()) {
- if (endpoint == get_broadcast_address()) {
+ for (const auto& host_id : get_endpoints()) {
+ if (host_id == my_host_id()) {
continue;
}
- auto permit = co_await lock_endpoint(endpoint, null_permit_id);
+ auto permit = co_await lock_endpoint(host_id, null_permit_id);
const auto& pid =
permit.id();
- auto eps = get_endpoint_state_ptr(endpoint);
+ auto eps = get_endpoint_state_ptr(host_id);
if (!eps) {
continue;
}
auto& ep_state = *eps;
- auto host_id = ep_state.get_host_id();
+ if (host_id != ep_state.get_host_id()) {
+ on_internal_error(logger, fmt::format("Gossiper entry with id {} has state with id {}", host_id, ep_state.get_host_id()));
+ }
bool is_alive = this->is_alive(host_id);
auto update_timestamp = ep_state.get_update_timestamp();
@@ -781,9 +777,9 @@ future<> gossiper::do_status_check() {
if (is_gossip_only_member(host_id)
&& !_just_removed_endpoints.contains(host_id)
&& ((now - update_timestamp) > fat_client_timeout)) {
-
logger.info("FatClient {} has been silent for {}ms, removing from gossip", endpoint, fat_client_timeout.count());
- co_await remove_endpoint(endpoint, pid); // will put it in _just_removed_endpoints to respect quarantine delay
- co_await evict_from_membership(endpoint, pid); // can get rid of the state immediately
+
logger.info("FatClient {} has been silent for {}ms, removing from gossip", host_id, fat_client_timeout.count());
+ co_await remove_endpoint(host_id, pid); // will put it in _just_removed_endpoints to respect quarantine delay
+ co_await evict_from_membership(host_id, pid); // can get rid of the state immediately
continue;
}
@@ -792,8 +788,8 @@ future<> gossiper::do_status_check() {
if (!is_alive && (now > expire_time)) {
const auto* node = get_token_metadata_ptr()->get_topology().find_node(host_id);
if (!host_id || !node || !node->is_member()) {
- logger.debug("time is expiring for endpoint : {} ({})", endpoint, expire_time.time_since_epoch().count());
- co_await evict_from_membership(endpoint, pid);
+ logger.debug("time is expiring for endpoint : {} ({})", host_id, expire_time.time_since_epoch().count());
+ co_await evict_from_membership(host_id, pid);
}
}
}
@@ -809,7 +805,7 @@ future<> gossiper::do_status_check() {
}
}
-gossiper::endpoint_permit::endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, inet_address addr, seastar::compat::source_location caller) noexcept
+gossiper::endpoint_permit::endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, locator::host_id addr, seastar::compat::source_location caller) noexcept
: _ptr(std::move(ptr))
, _permit_id(_ptr->pid)
, _addr(std::move(addr))
@@ -826,7 +822,7 @@ gossiper::endpoint_permit::endpoint_permit(endpoint_locks_map::entry_ptr&& ptr,
gossiper::endpoint_permit::endpoint_permit(endpoint_permit&& o) noexcept
: _ptr(std::exchange(o._ptr, nullptr))
, _permit_id(std::exchange(o._permit_id, null_permit_id))
- , _addr(std::exchange(o._addr, inet_address{}))
+ , _addr(std::exchange(o._addr, locator::host_id{}))
, _caller(std::move(o._caller))
{}
@@ -855,11 +851,11 @@ gossiper::endpoint_lock_entry::endpoint_lock_entry() noexcept
, pid(permit_id::create_null_id())
{}
-future<gossiper::endpoint_permit> gossiper::lock_endpoint(inet_address ep, permit_id pid, seastar::compat::source_location l) {
+future<gossiper::endpoint_permit> gossiper::lock_endpoint(locator::host_id ep, permit_id pid, seastar::compat::source_location l) {
if (this_shard_id() != 0) {
on_internal_error(logger, "lock_endpoint must be called on shard 0");
}
- auto eptr = co_await _endpoint_locks.get_or_load(ep, [] (const inet_address& ep) { return endpoint_lock_entry(); });
+ auto eptr = co_await _endpoint_locks.get_or_load(ep, [] (const locator::host_id& ep) { return endpoint_lock_entry(); });
if (pid) {
if (eptr->pid == pid) {
// Already locked with the same permit
@@ -913,7 +909,7 @@ future<gossiper::endpoint_permit> gossiper::lock_endpoint(inet_address ep, permi
co_return endpoint_permit(std::move(eptr), std::move(ep), std::move(l));
}
-void gossiper::permit_internal_error(const inet_address& addr, permit_id pid) {
+void gossiper::permit_internal_error(const locator::host_id& addr, permit_id pid) {
on_internal_error(logger, fmt::format("Must be called under lock_endpoint for node {}", addr));
}
@@ -976,8 +972,8 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene
}
if (diff > max_duration) {
logger.info("failure_detector_loop: Mark node {}/{} as DOWN", host_id, node);
- co_await container().invoke_on(0, [node] (gms::gossiper& g) {
- return g.convict(node);
+ co_await container().invoke_on(0, [host_id] (gms::gossiper& g) {
+ return g.convict(host_id);
});
co_return;
}
@@ -1086,7 +1082,7 @@ void gossiper::run() {
// MessagingService.instance().waitUntilListening();
{
- auto permit = lock_endpoint(get_broadcast_address(), null_permit_id).get();
+ auto permit = lock_endpoint(my_host_id(), null_permit_id).get();
/* Update the local heartbeat counter. */
heart_beat_state& hbs = my_endpoint_state().get_heart_beat_state();
hbs.update_heart_beat();
@@ -1230,7 +1226,7 @@ int64_t gossiper::get_endpoint_downtime(locator::host_id ep) const noexcept {
// Depends on
// - on_dead callbacks
// It is called from failure_detector
-future<> gossiper::convict(inet_address endpoint) {
+future<> gossiper::convict(locator::host_id endpoint) {
auto permit = co_await lock_endpoint(endpoint, null_permit_id);
auto state = get_endpoint_state_ptr(endpoint);
if (!state || !is_alive(state->get_host_id())) {
@@ -1256,31 +1252,23 @@ version_type gossiper::get_max_endpoint_state_version(const endpoint_state& stat
return max_version;
}
-future<> gossiper::evict_from_membership(inet_address endpoint, permit_id pid) {
- verify_permit(endpoint, pid);
- auto hid = get_host_id(endpoint);
- if (_address_map.find(hid) == endpoint) {
- // During IP address change we may have a situation where we work on old address
- // but there is a new address for the same host id, so no need to mark host id as down
- co_await mutate_live_and_unreachable_endpoints([hid] (live_and_unreachable_endpoints& data) {
- data.unreachable.erase(hid);
- data.live.erase(hid);
- });
- }
+future<> gossiper::evict_from_membership(locator::host_id hid, permit_id pid) {
+ verify_permit(hid, pid);
- co_await container().invoke_on_all([endpoint, hid] (auto& g) {
+ co_await mutate_live_and_unreachable_endpoints([hid] (live_and_unreachable_endpoints& data) {
+ data.unreachable.erase(hid);
+ data.live.erase(hid);
+ });
+
+ co_await container().invoke_on_all([hid] (auto& g) {
if (this_shard_id() == 0) {
- if (g._address_map.find(hid) == endpoint) {
- // During IP address change we may have a situation where we remove old gossiper state
- // but there is a new address for the same host id, so no need to make it expiring
- g._address_map.set_expiring(g.get_endpoint_state_ptr(endpoint)->get_host_id());
- }
+ g._address_map.set_expiring(hid);
}
- g._endpoint_state_map.erase(endpoint);
+ g._endpoint_state_map.erase(hid);
});
_expire_time_endpoint_map.erase(hid);
quarantine_endpoint(hid);
- logger.debug("evicting {} from gossip", endpoint);
+ logger.debug("evicting {} from gossip", hid);
}
void gossiper::quarantine_endpoint(locator::host_id id) {
@@ -1299,7 +1287,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
version_type max_version;
// local epstate will be part of _endpoint_state_map
- utils::chunked_vector<inet_address> endpoints;
+ utils::chunked_vector<locator::host_id> endpoints;
for (auto&& x : _endpoint_state_map) {
endpoints.push_back(x.first);
}
@@ -1311,13 +1299,12 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
generation = eps.get_heart_beat_state().get_generation();
max_version = get_max_endpoint_state_version(eps);
}
- g_digests.push_back(gossip_digest(endpoint, generation, max_version));
+ g_digests.push_back(gossip_digest(es->get_ip(), generation, max_version));
}
}
future<> gossiper::replicate(endpoint_state es, permit_id pid) {
- auto ep = es.get_ip();
- verify_permit(ep, pid);
+ verify_permit(es.get_host_id(), pid);
// First pass: replicate the new endpoint_state on all shards.
// Use foreign_ptr<std::unique_ptr> to ensure destroy on remote shards on exception
@@ -1341,27 +1328,27 @@ future<> gossiper::replicate(endpoint_state es, permit_id pid) {
try {
co_return co_await container().invoke_on_all([&] (gossiper& g) {
auto eps = ep_states[this_shard_id()].release();
+ auto hid = eps->get_host_id();
if (this_shard_id() == 0) {
- auto hid = eps->get_host_id();
- g._address_map.add_or_update_entry(hid, ep, eps->get_heart_beat_state().get_generation());
+ g._address_map.add_or_update_entry(hid, eps->get_ip(), eps->get_heart_beat_state().get_generation());
g._address_map.set_nonexpiring(hid);
}
- g._endpoint_state_map[ep] = std::move(eps);
+ g._endpoint_state_map[hid] = std::move(eps);
});
} catch (...) {
on_fatal_internal_error(logger, fmt::format("Failed to replicate endpoint_state: {}", std::current_exception()));
}
}
-future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id pid) {
- auto permit = co_await lock_endpoint(endpoint, pid);
+future<> gossiper::advertise_token_removed(locator::host_id host_id, permit_id pid) {
+ auto permit = co_await lock_endpoint(host_id, pid);
pid =
permit.id();
- auto eps = get_endpoint_state(endpoint);
+ auto eps = get_endpoint_state(host_id);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
-
logger.info("Completing removal of {}", endpoint);
+
logger.info("Completing removal of {}", host_id);
add_expire_time_for_endpoint(host_id, expire_time);
co_await replicate(std::move(eps), pid);
// ensure at least one gossip round occurs before returning
@@ -1373,7 +1360,12 @@ future<> gossiper::assassinate_endpoint(sstring address) {
throw std::runtime_error("Assassinating endpoint is not supported in topology over raft mode");
}
co_await container().invoke_on(0, [&] (auto&& gossiper) -> future<> {
- inet_address endpoint(address);
+ auto endpoint_opt = gossiper.get_host_id_opt(inet_address(address));
+ if (!endpoint_opt) {
+ logger.warn("There is no endpoint {} to assassinate", address);
+ throw std::runtime_error(format("There is no endpoint {} to assassinate", address));
+ }
+ auto endpoint = *endpoint_opt;
auto permit = co_await gossiper.lock_endpoint(endpoint, null_permit_id);
auto es = gossiper.get_endpoint_state_ptr(endpoint);
auto now = gossiper.now();
@@ -1387,8 +1379,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
std::vector<dht::token> tokens;
logger.warn("Assassinating {} via gossip", endpoint);
- const auto host_id = gossiper.get_host_id(endpoint);
- tokens = gossiper.get_token_metadata_ptr()->get_tokens(host_id);
+ tokens = gossiper.get_token_metadata_ptr()->get_tokens(endpoint);
if (tokens.empty()) {
logger.warn("Unable to calculate tokens for {}. Will use a random one", address);
throw std::runtime_error(format("Unable to calculate tokens for {}", endpoint));
@@ -1425,14 +1416,14 @@ future<> gossiper::assassinate_endpoint(sstring address) {
});
}
-future<generation_type> gossiper::get_current_generation_number(inet_address endpoint) const {
+future<generation_type> gossiper::get_current_generation_number(locator::host_id endpoint) const {
// FIXME: const container() has no const invoke_on variant
return const_cast<gossiper*>(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_generation();
});
}
-future<version_type> gossiper::get_current_heart_beat_version(inet_address endpoint) const {
+future<version_type> gossiper::get_current_heart_beat_version(locator::host_id endpoint) const {
// FIXME: const container() has no const invoke_on variant
return const_cast<gossiper*>(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_heart_beat_version();
@@ -1488,7 +1479,7 @@ clk::time_point gossiper::get_expire_time_for_endpoint(locator::host_id id) cons
}
}
-endpoint_state_ptr gossiper::get_endpoint_state_ptr(inet_address ep) const noexcept {
+endpoint_state_ptr gossiper::get_endpoint_state_ptr(locator::host_id ep) const noexcept {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
return nullptr;
@@ -1497,19 +1488,11 @@ endpoint_state_ptr gossiper::get_endpoint_state_ptr(inet_address ep) const noexc
}
}
-endpoint_state_ptr gossiper::get_endpoint_state_ptr(locator::host_id id) const noexcept {
- auto ip = _address_map.find(id);
- if (!ip) {
- return nullptr;
- }
- return get_endpoint_state_ptr(*ip);
-}
-
void gossiper::update_timestamp(const endpoint_state_ptr& eps) noexcept {
const_cast<endpoint_state&>(*eps).update_timestamp();
}
-const endpoint_state& gossiper::get_endpoint_state(inet_address ep) const {
+const endpoint_state& gossiper::get_endpoint_state(locator::host_id ep) const {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
throw std::out_of_range(format("ep={}", ep));
@@ -1518,10 +1501,11 @@ const endpoint_state& gossiper::get_endpoint_state(inet_address ep) const {
}
endpoint_state& gossiper::my_endpoint_state() {
+ auto id = my_host_id();
auto ep = get_broadcast_address();
- auto it = _endpoint_state_map.find(ep);
+ auto it = _endpoint_state_map.find(id);
if (it == _endpoint_state_map.end()) {
- it = _endpoint_state_map.emplace(ep, make_endpoint_state_ptr({ep})).first;
+ it = _endpoint_state_map.emplace(id, make_endpoint_state_ptr({ep})).first;
}
return const_cast<endpoint_state&>(*it->second);
}
@@ -1543,11 +1527,11 @@ future<> gossiper::reset_endpoint_state_map() {
});
}
-std::vector<inet_address> gossiper::get_endpoints() const {
+std::vector<locator::host_id> gossiper::get_endpoints() const {
return _endpoint_state_map | std::views::keys | std::ranges::to<std::vector>();
}
-stop_iteration gossiper::for_each_endpoint_state_until(std::function<stop_iteration(const inet_address&, const endpoint_state&)> func) const {
+stop_iteration gossiper::for_each_endpoint_state_until(std::function<stop_iteration(const locator::host_id&, const endpoint_state&)> func) const {
for (const auto& [node, eps] : _endpoint_state_map) {
if (func(node, *eps) == stop_iteration::yes) {
return stop_iteration::yes;
@@ -1556,7 +1540,7 @@ stop_iteration gossiper::for_each_endpoint_state_until(std::function<stop_iterat
return stop_iteration::no;
}
-bool gossiper::is_cql_ready(const inet_address& endpoint) const {
+bool gossiper::is_cql_ready(const locator::host_id& endpoint) const {
// Note:
// - New scylla node always send application_state::RPC_READY = false when
// the node boots and send application_state::RPC_READY = true when cql
@@ -1576,11 +1560,16 @@ bool gossiper::is_cql_ready(const inet_address& endpoint) const {
}
locator::host_id gossiper::get_host_id(inet_address endpoint) const {
- auto eps = get_endpoint_state_ptr(endpoint);
- if (!eps) {
+ auto ids = _endpoint_state_map | std::views::values | std::views::filter([endpoint] (const auto& es) { return es->get_ip() == endpoint; });
+
+ if (std::ranges::distance(ids) == 0) {
throw std::runtime_error(format("Could not get host_id for endpoint {}: endpoint state not found", endpoint));
}
- auto host_id = eps->get_host_id();
+
+ // Find an entry with largest generation
+ const auto& es = std::ranges::max(ids, [](const auto& ep1, const auto& ep2) { return ep1->get_heart_beat_state().get_generation() < ep2->get_heart_beat_state().get_generation(); });
+
+ auto host_id = es->get_host_id();
if (!host_id) {
throw std::runtime_error(format("Host {} does not have HOST_ID application_state", endpoint));
}
@@ -1597,17 +1586,14 @@ std::optional<locator::host_id> gossiper::get_host_id_opt(inet_address endpoint)
std::set<gms::inet_address> gossiper::get_nodes_with_host_id(locator::host_id host_id) const {
- std::set<gms::inet_address> nodes;
- for (const auto& [node, eps] : _endpoint_state_map) {
- auto app_state = eps->get_application_state_ptr(application_state::HOST_ID);
- if (app_state && host_id == locator::host_id(utils::UUID(app_state->value()))) {
- nodes.insert(node);
- }
+ if (auto it = _endpoint_state_map.find(host_id); it != _endpoint_state_map.end()) {
+ return {it->second->get_ip()};
+ } else {
+ return {};
}
- return nodes;
}
-std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) const {
+std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(locator::host_id for_endpoint, version_type version) const {
std::optional<endpoint_state> reqd_endpoint_state;
auto es = get_endpoint_state_ptr(for_endpoint);
if (es) {
@@ -1641,7 +1627,7 @@ std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_a
return reqd_endpoint_state;
}
-std::strong_ordering gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) const {
+std::strong_ordering gossiper::compare_endpoint_startup(locator::host_id addr1, locator::host_id addr2) const {
auto ep1 = get_endpoint_state_ptr(addr1);
auto ep2 = get_endpoint_state_ptr(addr2);
if (!ep1 || !ep2) {
@@ -1652,7 +1638,7 @@ std::strong_ordering gossiper::compare_endpoint_startup(inet_address addr1, inet
return ep1->get_heart_beat_state().get_generation() <=> ep2->get_heart_beat_state().get_generation();
}
-sstring gossiper::get_rpc_address(const inet_address& endpoint) const {
+sstring gossiper::get_rpc_address(const locator::host_id& endpoint) const {
auto* v = get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS);
if (v) {
return v->value();
@@ -1662,7 +1648,7 @@ sstring gossiper::get_rpc_address(const inet_address& endpoint) const {
void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map) {
for (const auto& x : map) {
- const gms::inet_address& endpoint = x.first;
+ const locator::host_id& endpoint = get_host_id_opt(x.first).value_or(locator::host_id{});
const endpoint_state& remote_endpoint_state = x.second;
auto local_endpoint_state = get_endpoint_state_ptr(endpoint);
if (local_endpoint_state) {
@@ -1699,14 +1685,9 @@ future<> gossiper::notify_nodes_on_up(std::unordered_set<locator::host_id> dsts)
});
}
-void gossiper::mark_alive(inet_address addr) {
- auto id = get_host_id(addr);
- if (id == my_host_id()) {
- // We are here because this node changed address and now tries to
- // ping an old gossip entry.
- return;
- }
-
+void gossiper::mark_alive(endpoint_state_ptr node) {
+ auto id = node->get_host_id();
+ auto addr = node->get_ip();
// Enter the _background_msg gate so stop() would wait on it
auto inserted = _pending_mark_alive_endpoints.insert(id).second;
if (inserted) {
@@ -1732,40 +1713,39 @@ void gossiper::mark_alive(inet_address addr) {
// Enter the _background_msg gate so stop() would wait on it
auto gh = _background_msg.hold();
logger.debug("Sending a EchoMessage to {}/{}, with generation_number={}", id, addr, generation);
- (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, get_host_id(addr), netw::messaging_service::clock_type::now() + std::chrono::seconds(15), generation.value(), false).then([this, addr] {
+ (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(15), generation.value(), false).then([this, id] {
logger.trace("Got EchoMessage Reply");
- return real_mark_alive(addr);
+ return real_mark_alive(id);
}).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending), id] (auto ep) {
logger.warn("Fail to send EchoMessage to {}/{}: {}", id, addr, ep);
});
}
-future<> gossiper::real_mark_alive(inet_address addr) {
- auto permit = co_await lock_endpoint(addr, null_permit_id);
+future<> gossiper::real_mark_alive(locator::host_id host_id) {
+ auto permit = co_await lock_endpoint(host_id, null_permit_id);
// After sending echo message, the Node might not be in the
// _endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
- auto es = get_endpoint_state_ptr(addr);
+ auto es = get_endpoint_state_ptr(host_id);
if (!es) {
-
logger.info("Node {} is not in endpoint_state_map anymore", addr);
+
logger.info("Node {} is not in endpoint_state_map anymore", host_id);
co_return;
}
// Do not mark a node with status shutdown as UP.
auto status = sstring(get_gossip_status(*es));
if (status == sstring(versioned_value::SHUTDOWN)) {
- logger.warn("Skip marking node {} with status = {} as UP", addr, status);
+ logger.warn("Skip marking node {} with status = {} as UP", host_id, status);
co_return;
}
- logger.debug("Mark Node {} alive after EchoMessage", addr);
+ logger.debug("Mark Node {} alive after EchoMessage", host_id);
// prevents do_status_check from racing us and evicting if it was down > A_VERY_LONG_TIME
update_timestamp(es);
- auto host_id = es->get_host_id();
- logger.debug("removing expire time for endpoint : {}", addr);
+ logger.debug("removing expire time for endpoint : {}", host_id);
bool was_live = false;
co_await mutate_live_and_unreachable_endpoints([addr = host_id, &was_live] (live_and_unreachable_endpoints& data) {
data.unreachable.erase(addr);
@@ -1783,6 +1763,8 @@ future<> gossiper::real_mark_alive(inet_address addr) {
_endpoints_to_talk_with.front().push_back(host_id);
}
+ auto addr = es->get_ip();
+
logger.info("InetAddress {}/{} is now UP, status = {}", host_id, addr, status);
co_await _subscribers.for_each([addr, host_id, es, pid =
permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
@@ -1791,24 +1773,20 @@ future<> gossiper::real_mark_alive(inet_address addr) {
});
}
-future<> gossiper::mark_dead(inet_address addr, endpoint_state_ptr state, permit_id pid) {
+future<> gossiper::mark_dead(locator::host_id addr, endpoint_state_ptr state, permit_id pid) {
logger.trace("marking as down {}", addr);
verify_permit(addr, pid);
- if (_address_map.find(state->get_host_id()) == addr) {
- // During IP address change we may have a situation where we work on old address
- // but there is a new address for the same host id, so no need to mark host id as down
- co_await mutate_live_and_unreachable_endpoints([addr = state->get_host_id()] (live_and_unreachable_endpoints& data) {
- data.live.erase(addr);
- data.unreachable[addr] = now();
- });
- }
-
logger.info("InetAddress {}/{} is now DOWN, status = {}", state->get_host_id(), addr, get_gossip_status(*state));
- co_await do_on_dead_notifications(addr, std::move(state), pid);
+ co_await mutate_live_and_unreachable_endpoints([addr = state->get_host_id()] (live_and_unreachable_endpoints& data) {
+ data.live.erase(addr);
+ data.unreachable[addr] = now();
+ });
+
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(*state));
+ co_await do_on_dead_notifications(state->get_ip(), std::move(state), pid);
}
future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid, bool shadow_round) {
- auto ep = eps.get_ip();
+ auto ep = eps.get_host_id();
verify_permit(ep, pid);
endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep);
@@ -1830,7 +1808,7 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
if (eps_old) {
// the node restarted: it is up to the subscriber to take whatever action is necessary
co_await _subscribers.for_each([ep, eps_old, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_restart(ep, eps_old->get_host_id(), eps_old, pid);
+ return subscriber->on_restart(eps_old->get_ip(), ep, eps_old, pid);
});
}
@@ -1839,14 +1817,14 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
throw std::out_of_range(format("ep={}", ep));
}
if (!is_dead_state(*ep_state)) {
- mark_alive(ep);
+ mark_alive(ep_state);
} else {
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
co_await mark_dead(ep, ep_state, pid);
}
co_await _subscribers.for_each([ep, ep_state, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_join(ep, ep_state->get_host_id(), ep_state, pid);
+ return subscriber->on_join(ep_state->get_ip(), ep, ep_state, pid);
});
// check this at the end so nodes will learn about the endpoint
@@ -1859,7 +1837,7 @@ bool gossiper::is_dead_state(const endpoint_state& eps) const {
return std::ranges::any_of(DEAD_STATES, [state = get_gossip_status(eps)](const auto& deadstate) { return state == deadstate; });
}
-bool gossiper::is_shutdown(const inet_address& endpoint) const {
+bool gossiper::is_shutdown(const locator::host_id& endpoint) const {
return get_gossip_status(endpoint) == versioned_value::SHUTDOWN;
}
@@ -1867,16 +1845,16 @@ bool gossiper::is_shutdown(const endpoint_state& eps) const {
return get_gossip_status(eps) == versioned_value::SHUTDOWN;
}
-bool gossiper::is_normal(const inet_address& endpoint) const {
+bool gossiper::is_normal(const locator::host_id& endpoint) const {
return get_gossip_status(endpoint) == versioned_value::STATUS_NORMAL;
}
-bool gossiper::is_left(const inet_address& endpoint) const {
+bool gossiper::is_left(const locator::host_id& endpoint) const {
auto status = get_gossip_status(endpoint);
return status == versioned_value::STATUS_LEFT || status == versioned_value::REMOVED_TOKEN;
}
-bool gossiper::is_normal_ring_member(const inet_address& endpoint) const {
+bool gossiper::is_normal_ring_member(const locator::host_id& endpoint) const {
auto status = get_gossip_status(endpoint);
return status == versioned_value::STATUS_NORMAL || status == versioned_value::SHUTDOWN;
}
@@ -1888,9 +1866,9 @@ bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{
future<> gossiper::apply_new_states(endpoint_state local_state, const endpoint_state& remote_state, permit_id pid, bool shadow_round) {
// don't SCYLLA_ASSERT here, since if the node restarts the version will go back to zero
//int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version();
- auto addr = local_state.get_ip();
+ auto host_id = local_state.get_host_id();
- verify_permit(addr, pid);
+ verify_permit(host_id, pid);
if (!shadow_round) {
local_state.set_heart_beat_state_and_update_timestamp(remote_state.get_heart_beat_state());
@@ -1928,8 +1906,7 @@ future<> gossiper::apply_new_states(endpoint_state local_state, const endpoint_s
ep = std::current_exception();
}
- auto host_id = local_state.get_host_id();
-
+ auto addr = local_state.get_ip();
// We must replicate endpoint states before listeners run.
// Exceptions during replication will cause abort because node's state
// would be inconsistent across shards. Changes listeners depend on state
@@ -1984,10 +1961,13 @@ void gossiper::send_all(gossip_digest& g_digest,
std::map<inet_address, endpoint_state>& delta_ep_state_map,
version_type max_remote_version) const {
auto ep = g_digest.get_endpoint();
- logger.trace("send_all(): ep={}, version > {}", ep, max_remote_version);
- auto local_ep_state_ptr = get_state_for_version_bigger_than(ep, max_remote_version);
- if (local_ep_state_ptr) {
- delta_ep_state_map.emplace(ep, *local_ep_state_ptr);
+ auto id = get_host_id_opt(ep);
+ logger.trace("send_all(): ep={}/{}, version > {}", id, ep, max_remote_version);
+ if (id) {
+ auto local_ep_state_ptr = get_state_for_version_bigger_than(*id, max_remote_version);
+ if (local_ep_state_ptr) {
+ delta_ep_state_map.emplace(ep, *local_ep_state_ptr);
+ }
}
}
@@ -1999,7 +1979,8 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
auto max_remote_version = g_digest.get_max_version();
/* Get state associated with the end point in digest */
auto&& ep = g_digest.get_endpoint();
- auto es = get_endpoint_state_ptr(ep);
+ auto id = get_host_id_opt(ep);
+ auto es = get_endpoint_state_ptr(id.value_or(locator::host_id{}));
/* Here we need to fire a GossipDigestAckMessage. If we have some
* data associated with this endpoint locally then we follow the
* "if" path of the logic. If we have absolutely nothing for this
@@ -2049,7 +2030,7 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
}
future<> gossiper::start_gossiping(gms::generation_type generation_nbr, application_state_map preload_local_states) {
- auto permit = co_await lock_endpoint(get_broadcast_address(), null_permit_id);
+ auto permit = co_await lock_endpoint(my_host_id(), null_permit_id);
build_seeds_list();
if (_gcfg.force_gossip_generation() > 0) {
@@ -2085,7 +2066,7 @@ future<gossiper::generation_for_nodes>
gossiper::get_generation_for_nodes(std::unordered_set<locator::host_id> nodes) const {
generation_for_nodes ret;
for (const auto& node : nodes) {
- auto es = get_endpoint_state_ptr(_address_map.find(node).value());
+ auto es = get_endpoint_state_ptr(node);
if (es) {
auto current_generation_number = es->get_heart_beat_state().get_generation();
ret.emplace(node, current_generation_number);
@@ -2201,11 +2182,11 @@ future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endp
on_internal_error(logger, format("Attempt to add {} with broadcast_address {} as saved endpoint", host_id, ep));
}
- auto permit = co_await lock_endpoint(ep, pid);
+ auto permit = co_await lock_endpoint(host_id, pid);
//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
auto ep_state = endpoint_state(ep);
- auto es = get_endpoint_state_ptr(ep);
+ auto es = get_endpoint_state_ptr(host_id);
if (es) {
if (es->get_heart_beat_state().get_generation()) {
auto msg = fmt::format("Attempted to add saved endpoint {} after endpoint_state was already established with gossip: {}, at {}", ep, es->get_heart_beat_state(), current_backtrace());
@@ -2260,9 +2241,10 @@ future<> gossiper::add_local_application_state(application_state_map states) {
try {
co_await container().invoke_on(0, [&] (gossiper& gossiper) mutable -> future<> {
inet_address ep_addr = gossiper.get_broadcast_address();
+ auto ep_id = gossiper.my_host_id();
// for symmetry with other apply, use endpoint lock for our own address.
- auto permit = co_await gossiper.lock_endpoint(ep_addr, null_permit_id);
- auto ep_state_before = gossiper.get_endpoint_state_ptr(ep_addr);
+ auto permit = co_await gossiper.lock_endpoint(ep_id, null_permit_id);
+ auto ep_state_before = gossiper.get_endpoint_state_ptr(ep_id);
if (!ep_state_before) {
auto err = fmt::format("endpoint_state_map does not contain endpoint = {}, application_states = {}",
ep_addr, states);
@@ -2467,7 +2449,7 @@ future<> gossiper::wait_for_live_nodes_to_show_up(size_t n) {
logger.info("Live nodes seen in gossip: {}", get_live_members());
}
-const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept {
+const versioned_value* gossiper::get_application_state_ptr(locator::host_id endpoint, application_state appstate) const noexcept {
auto eps = get_endpoint_state_ptr(std::move(endpoint));
if (!eps) {
return nullptr;
@@ -2475,19 +2457,7 @@ const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint
return eps->get_application_state_ptr(appstate);
}
-const versioned_value* gossiper::get_application_state_ptr(locator::host_id id, application_state appstate) const noexcept {
- auto endpoint = _address_map.find(id);
- if (!endpoint) {
- return nullptr;
- }
- auto eps = get_endpoint_state_ptr(std::move(*endpoint));
- if (!eps) {
- return nullptr;
- }
- return eps->get_application_state_ptr(appstate);
-}
-
-sstring gossiper::get_application_state_value(inet_address endpoint, application_state appstate) const {
+sstring gossiper::get_application_state_value(locator::host_id endpoint, application_state appstate) const {
auto v = get_application_state_ptr(endpoint, appstate);
if (!v) {
return {};
@@ -2499,7 +2469,7 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
* This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it
* @param endpoint endpoint that has shut itself down
*/
-future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) {
+future<> gossiper::mark_as_shutdown(const locator::host_id& endpoint, permit_id pid) {
verify_permit(endpoint, pid);
auto es = get_endpoint_state_ptr(endpoint);
if (es) {
@@ -2533,7 +2503,7 @@ std::string_view gossiper::get_gossip_status(const endpoint_state& ep_state) con
return do_get_gossip_status(ep_state.get_application_state_ptr(application_state::STATUS));
}
-std::string_view gossiper::get_gossip_status(const inet_address& endpoint) const noexcept {
+std::string_view gossiper::get_gossip_status(const locator::host_id& endpoint) const noexcept {
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
}
@@ -2598,8 +2568,13 @@ bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
// 1) The node is a completely new node and no state in gossip at all
// 2) The node has state in gossip and it is already removed from the
// cluster either by nodetool decommission or nodetool removenode
- auto eps = get_endpoint_state_ptr(endpoint);
bool allowed = true;
+ auto host_id = get_host_id_opt(endpoint);
+ if (!host_id) {
+ logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
+ return allowed;
+ }
+ auto eps = get_endpoint_state_ptr(*host_id);
if (!eps) {
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
return allowed;
@@ -2614,7 +2589,7 @@ bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
return allowed;
}
-bool gossiper::is_safe_for_restart(inet_address endpoint, locator::host_id host_id) const {
+bool gossiper::is_safe_for_restart(locator::host_id host_id) const {
// Reject to restart a node in case:
// *) if the node has been removed from the cluster by nodetool decommission or
// nodetool removenode
@@ -2627,13 +2602,10 @@ bool gossiper::is_safe_for_restart(inet_address endpoint, locator::host_id host_
auto node = x.first;
try {
auto status = get_gossip_status(node);
- auto id = get_host_id(node);
- logger.debug("is_safe_for_restart: node={}, host_id={}, status={}, my_ip={}, my_host_id={}",
- node, id, status, endpoint, host_id);
- if (host_id == id && not_allowed_statuses.contains(status)) {
+ logger.debug("is_safe_for_restart: node with host_id={}, status={}", node, status);
+ if (host_id == node && not_allowed_statuses.contains(status)) {
allowed = false;
- logger.error("is_safe_for_restart: node={}, host_id={}, status={}, my_ip={}, my_host_id={}",
- node, id, status, endpoint, host_id);
+ logger.error("is_safe_for_restart: node with host_id={}, status={}", node, status);
break;
}
} catch (...) {
diff --git a/locator/util.cc b/locator/util.cc
index 55c0e0d4911..dce81b71900 100644
--- a/locator/util.cc
+++ b/locator/util.cc
@@ -122,7 +122,7 @@ describe_ring(const replica::database& db, const gms::gossiper& gossiper, const
details._host = gossiper.get_address_map().get(endpoint);
details._datacenter = topology.get_datacenter(endpoint);
details._rack = topology.get_rack(endpoint);
- tr._rpc_endpoints.push_back(gossiper.get_rpc_address(details._host));
+ tr._rpc_endpoints.push_back(gossiper.get_rpc_address(endpoint));
tr._endpoints.push_back(fmt::to_string(details._host));
tr._endpoint_details.push_back(details);
}
diff --git a/repair/repair.cc b/repair/repair.cc
index 974c13dff22..9f28621576e 100644
--- a/repair/repair.cc
+++ b/repair/repair.cc
@@ -1240,14 +1240,14 @@ future<int> repair_service::do_repair_start(gms::gossip_address_map& addr_map, s
auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace));
auto& erm = germs->get();
auto& topology = erm.get_token_metadata().get_topology();
- auto my_address = erm.get_topology().my_address();
+ auto my_host_id = erm.get_topology().my_host_id();
if (erm.get_replication_strategy().get_type() == locator::replication_strategy_type::local) {
rlogger.info("repair[{}]: completed successfully: nothing to repair for keyspace {} with local replication strategy", id.uuid(), keyspace);
co_return
id.id;
}
- if (!_gossiper.local().is_normal(my_address)) {
+ if (!_gossiper.local().is_normal(my_host_id)) {
throw std::runtime_error("Node is not in NORMAL status yet!");
}
@@ -1256,7 +1256,6 @@ future<int> repair_service::do_repair_start(gms::gossip_address_map& addr_map, s
// Each of these ranges may have a different set of replicas, so the
// repair of each range is performed separately with repair_range().
dht::token_range_vector ranges;
- auto my_host_id = erm.get_topology().my_host_id();
if (options.ranges.size()) {
ranges = options.ranges;
} else if (options.primary_range) {
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index d2e4cb12f57..79464d8d4e4 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -237,7 +237,7 @@ bool migration_manager::have_schema_agreement() {
auto our_version = _storage_proxy.get_db().local().get_version();
bool match = false;
static thread_local logger::rate_limit rate_limit{std::chrono::seconds{5}};
- _gossiper.for_each_endpoint_state_until([&, my_address = _messaging.broadcast_address()] (const gms::inet_address& endpoint, const gms::endpoint_state& eps) {
+ _gossiper.for_each_endpoint_state_until([&, my_address = _gossiper.my_host_id()] (const locator::host_id& endpoint, const gms::endpoint_state& eps) {
if (endpoint == my_address || !_gossiper.is_alive(eps.get_host_id())) {
return stop_iteration::no;
}
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 056b6b2c848..1cc09ffe6c5 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -452,7 +452,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
- auto info = get_peer_info_for_update(ip);
+ auto info = get_peer_info_for_update(id);
if (info) {
// And then amend with the info from raft
info->tokens = rs.ring.value().tokens;
@@ -1572,7 +1572,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
// Check if the node is already removed from the cluster
auto local_host_id = get_token_metadata().get_my_id();
auto my_ip = get_broadcast_address();
- if (!_gossiper.is_safe_for_restart(my_ip, local_host_id)) {
+ if (!_gossiper.is_safe_for_restart(local_host_id)) {
throw std::runtime_error(::format("The node {} with host_id {} is removed from the cluster. Can not restart the removed node to join the cluster again!",
my_ip, local_host_id));
}
@@ -2067,7 +2067,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
})(*this, proxy);
std::unordered_set<locator::host_id> ids;
- _gossiper.for_each_endpoint_state([this, &ids] (const inet_address& addr, const gms::endpoint_state& ep) {
+ _gossiper.for_each_endpoint_state([this, &ids] (const locator::host_id& addr, const gms::endpoint_state& ep) {
if (_gossiper.is_normal(addr)) {
ids.insert(ep.get_host_id());
}
@@ -2267,7 +2267,7 @@ storage_service::get_range_to_address_map(locator::effective_replication_map_ptr
future<> storage_service::handle_state_bootstrap(inet_address endpoint, locator::host_id host_id, gms::permit_id pid) {
slogger.debug("endpoint={}/{} handle_state_bootstrap: permit_id={}", endpoint, host_id, pid);
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
- auto tokens = get_tokens_for(endpoint);
+ auto tokens = get_tokens_for(host_id);
slogger.debug("Node {}/{} state bootstrapping, token {}", endpoint, host_id, tokens);
@@ -2300,26 +2300,30 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
slogger.debug("endpoint={}/{} handle_state_normal: permit_id={}", endpoint, host_id, pid);
- auto tokens = get_tokens_for(endpoint);
+ auto tokens = get_tokens_for(host_id);
slogger.info("Node {}/{} is in normal state, tokens: {}", endpoint, host_id, tokens);
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
- std::unordered_set<inet_address> endpoints_to_remove;
-
- auto do_remove_node = [&] (gms::inet_address node) {
- // this lambda is called in three cases:
- // 1. old endpoint for the given host_id is ours, we remove the new endpoint;
- // 2. new endpoint for the given host_id has bigger generation, we remove the old endpoint;
- // 3. old endpoint for the given host_id has bigger generation, we remove the new endpoint.
- // In all of these cases host_id is retained, only the IP addresses are changed.
- // We don't need to call remove_endpoint on tmptr, since it will be called
- // indirectly through the chain endpoints_to_remove->storage_service::remove_endpoint ->
- // _gossiper.remove_endpoint -> storage_service::on_remove.
-
- endpoints_to_remove.insert(node);
- };
+
+ // peers table: IP1->ID1
+ // gossiper: ID1->IP1
+ //
+ // replace with same IP:
+ // peers table: IP1->ID2 (IP1->ID1 will be replaced, no need to remove)
+ // gossiper: ID2->IP1 (ID1->IP1 entry needs to be removed)
+ //
+ // replace with different IP:
+ // peers table: IP2->ID2 (IP1->ID1 entry needs to be removed)
+ // gossiper: ID2->IP2 (ID1->IP1 entry needs to be removed)
+ //
+ // reboot with different IP:
+ // peers table: IP2->ID1 (IP1->ID1 needs to be removed)
+ // gossiper: ID1->IP2 (ID1->IP1 will replaced, no need to remove)
+ std::unordered_set<inet_address> remove_from_peers;
+ std::unordered_set<locator::host_id> remove_from_gossiper;
+
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (tmptr->is_normal_token_owner(host_id)) {
slogger.info("handle_state_normal: node {}/{} was already a normal token owner", endpoint, host_id);
@@ -2336,33 +2340,16 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
existing = id_to_ip_map.contains(host_id) ? id_to_ip_map[host_id] : endpoint;
}
+ // endpoint = IP2, existing=IP1, host_id=ID1
+ // need to remove IP1 from peers
if (existing && *existing != endpoint) {
// This branch in taken when a node changes its IP address.
-
if (*existing == get_broadcast_address()) {
slogger.warn("Not updating host ID {} for {} because it's mine", host_id, endpoint);
- do_remove_node(endpoint);
- } else if (std::is_gt(_gossiper.compare_endpoint_startup(endpoint, *existing))) {
- // The new IP has greater generation than the existing one.
- // Here we remap the host_id to the new IP. The 'owned_tokens' calculation logic below
- // won't detect any changes - the branch 'endpoint == current_owner' will be taken.
- // We still need to call 'remove_endpoint' for existing IP to remove it from system.peers.
-
+ remove_from_peers.emplace(endpoint);
+ } else {
slogger.warn("Host ID collision for {} between {} and {}; {} is the new owner", host_id, *existing, endpoint, endpoint);
- do_remove_node(*existing);
- } else {
- // The new IP has smaller generation than the existing one,
- // we are going to remove it, so we add it to the endpoints_to_remove.
- // How does this relate to the tokens this endpoint may have?
- // There is a condition below which checks that if endpoints_to_remove
- // contains 'endpoint', then the owned_tokens must be empty, otherwise internal_error
- // is triggered. This means the following is expected to be true:
- // 1. each token from the tokens variable (which is read from gossiper) must have an owner node
- // 2. this owner must be different from 'endpoint'
- // 3. its generation must be greater than endpoint's
-
- slogger.warn("Host ID collision for {} between {} and {}; ignored {}", host_id, *existing, endpoint, endpoint);
- do_remove_node(endpoint);
+ remove_from_peers.emplace(*existing);
}
} else if (existing && *existing == endpoint) {
// This branch is taken for all gossiper-managed topology operations.
@@ -2393,11 +2380,9 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
// For example, a new node receives this notification for every
// existing node in the cluster.
- auto nodes = _gossiper.get_nodes_with_host_id(host_id);
- bool left = std::any_of(nodes.begin(), nodes.end(), [this] (const gms::inet_address& node) { return _gossiper.is_left(node); });
- if (left) {
-
slogger.info("Skip to set host_id={} to be owned by node={}, because the node is removed from the cluster, nodes {} used to own the host_id", host_id, endpoint, nodes);
- _normal_state_handled_on_boot.insert(endpoint);
+ if (_gossiper.is_left(host_id)) {
+
slogger.info("Skip to set host_id={} to be owned by node={}, because the node is removed from the cluster", host_id, endpoint);
+ _normal_state_handled_on_boot.insert(host_id);
co_return;
}
}
@@ -2432,11 +2417,11 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
continue;
}
auto current_owner = current->second;
- if (endpoint == _address_map.get(current_owner)) {
+ if (host_id == current_owner) {
slogger.info("handle_state_normal: endpoint={} == current_owner={} token {}", host_id, current_owner, t);
// set state back to normal, since the node may have tried to leave, but failed and is now back up
owned_tokens.insert(t);
- } else if (std::is_gt(_gossiper.compare_endpoint_startup(endpoint, _address_map.get(current_owner)))) {
+ } else if (std::is_gt(_gossiper.compare_endpoint_startup(host_id, current_owner))) {
slogger.debug("handle_state_normal: endpoint={} > current_owner={}, token {}", host_id, current_owner, t);
owned_tokens.insert(t);
slogger.info("handle_state_normal: remove endpoint={} token={}", current_owner, t);
@@ -2444,7 +2429,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
// a host no longer has any tokens, we'll want to remove it.
token_to_endpoint_map.erase(current);
candidates_for_removal.insert(current_owner);
-
slogger.info("handle_state_normal: Nodes {} and {} have the same token {}. {} is the new owner", host_id, current_owner, t, endpoint);
+
slogger.info("handle_state_normal: Nodes {} and {} have the same token {}. {} is the new owner", host_id, current_owner, t, host_id);
} else {
// current owner of this token is kept and endpoint attempt to own it is rejected.
// Keep track of these moves, because when a host no longer has any tokens, we'll want to remove it.
@@ -2471,15 +2456,15 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
for (const auto& ep : candidates_for_removal) {
slogger.info("handle_state_normal: endpoints_to_remove endpoint={}", ep);
- endpoints_to_remove.insert(_address_map.get(ep));
+ remove_from_gossiper.insert(ep);
}
bool is_normal_token_owner = tmptr->is_normal_token_owner(host_id);
bool do_notify_joined = false;
- if (endpoints_to_remove.contains(endpoint)) [[unlikely]] {
+ if (remove_from_gossiper.contains(host_id)) [[unlikely]] {
if (!owned_tokens.empty()) {
- on_fatal_internal_error(slogger, ::format("endpoint={} is marked for removal but still owns {} tokens", endpoint, owned_tokens.size()));
+ on_fatal_internal_error(slogger, ::format("endpoint={} is marked for removal but still owns {} tokens", host_id, owned_tokens.size()));
}
} else {
if (!is_normal_token_owner) {
@@ -2505,13 +2490,26 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
- for (auto ep : endpoints_to_remove) {
- co_await remove_endpoint(ep, ep == endpoint ? pid : gms::null_permit_id);
+ for (auto id : remove_from_gossiper) {
+ auto ip = _address_map.get(id);
+ co_await _gossiper.remove_endpoint(id, id == host_id ? pid : gms::null_permit_id);
+ if (ip != endpoint) {
+ remove_from_peers.emplace(ip);
+ }
+ }
+
+ for (auto ep : remove_from_peers) {
+ try {
+ co_await _sys_ks.local().remove_endpoint(ep);
+ } catch (...) {
+ slogger.error("fail to remove endpoint={}: {}", ep, std::current_exception());
+ }
}
- slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} endpoint_to_remove={} owned_tokens={}", endpoint, is_normal_token_owner, endpoints_to_remove.contains(endpoint), owned_tokens);
- if (!is_me(endpoint) && !owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
+
+ slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} remove_from_peers={} owned_tokens={}", endpoint, is_normal_token_owner, remove_from_peers.contains(endpoint), owned_tokens);
+ if (!is_me(endpoint) && !owned_tokens.empty() && !remove_from_peers.count(endpoint)) {
try {
- auto info = get_peer_info_for_update(endpoint).value();
+ auto info = get_peer_info_for_update(host_id).value();
info.tokens = std::move(owned_tokens);
co_await _sys_ks.local().update_peer_info(endpoint, host_id, info);
} catch (...) {
@@ -2532,7 +2530,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint, locator::ho
slogger.debug("handle_state_normal: token_metadata.ring_version={}, token={} -> endpoint={}/{}", ver, x.first, _address_map.get(x.second), x.second);
}
}
- _normal_state_handled_on_boot.insert(endpoint);
+ _normal_state_handled_on_boot.insert(host_id);
slogger.info("handle_state_normal for {}/{} finished", endpoint, host_id);
}
@@ -2543,10 +2541,10 @@ future<> storage_service::handle_state_left(inet_address endpoint, locator::host
slogger.warn("Fail to handle_state_left endpoint={} pieces={}", endpoint, pieces);
co_return;
}
- auto tokens = get_tokens_for(endpoint);
+ auto tokens = get_tokens_for(host_id);
slogger.debug("Node {}/{} state left, tokens {}", endpoint, host_id, tokens);
if (tokens.empty()) {
- auto eps = _gossiper.get_endpoint_state_ptr(endpoint);
+ auto eps = _gossiper.get_endpoint_state_ptr(host_id);
if (eps) {
slogger.warn("handle_state_left: Tokens for node={} are empty, endpoint_state={}", endpoint, *eps);
} else {
@@ -2659,7 +2657,7 @@ future<> storage_service::on_change(gms::inet_address endpoint, locator::host_id
if (node && node->is_member() && (co_await get_ip_from_peers_table(host_id)) == endpoint) {
if (!is_me(endpoint)) {
slogger.debug("endpoint={}/{} on_change: updating system.peers table", endpoint, host_id);
- if (auto info = get_peer_info_for_update(endpoint, states)) {
+ if (auto info = get_peer_info_for_update(host_id, states)) {
co_await _sys_ks.local().update_peer_info(endpoint, host_id, *info);
}
}
@@ -2731,7 +2729,7 @@ future<> storage_service::on_restart(gms::inet_address endpoint, locator::host_i
return make_ready_future();
}
-std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint) {
+std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(locator::host_id endpoint) {
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
return db::system_keyspace::peer_info{};
@@ -2743,7 +2741,7 @@ std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for
return info;
}
-std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) {
+std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(locator::host_id endpoint, const gms::application_state_map& app_state_map) {
std::optional<db::system_keyspace::peer_info> ret;
auto get_peer_info = [&] () -> db::system_keyspace::peer_info& {
@@ -2802,7 +2800,7 @@ std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for
return ret;
}
-std::unordered_set<locator::token> storage_service::get_tokens_for(inet_address endpoint) {
+std::unordered_set<locator::token> storage_service::get_tokens_for(locator::host_id endpoint) {
auto tokens_string = _gossiper.get_application_state_value(endpoint, application_state::TOKENS);
slogger.trace("endpoint={}, tokens_string={}", endpoint, tokens_string);
auto ret = versioned_value::tokens_from_string(tokens_string);
@@ -3349,7 +3347,10 @@ future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::i
}
future<> storage_service::remove_endpoint(inet_address endpoint, gms::permit_id pid) {
- co_await _gossiper.remove_endpoint(endpoint, pid);
+ auto host_id_opt = _gossiper.get_host_id_opt(endpoint);
+ if (host_id_opt) {
+ co_await _gossiper.remove_endpoint(*host_id_opt, pid);
+ }
try {
co_await _sys_ks.local().remove_endpoint(endpoint);
} catch (...) {
@@ -3400,15 +3401,17 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
throw std::runtime_error(::format("Found multiple nodes with Host ID {}: {}", replace_host_id, nodes));
}
replace_address = *nodes.begin();
+ } else {
+ replace_host_id = _gossiper.get_host_id(replace_address);
}
- auto state = _gossiper.get_endpoint_state_ptr(replace_address);
+ auto state = _gossiper.get_endpoint_state_ptr(replace_host_id);
if (!state) {
throw std::runtime_error(::format("Cannot replace_address {} because it doesn't exist in gossip", replace_address));
}
// Reject to replace a node that has left the ring
- auto status = _gossiper.get_gossip_status(replace_address);
+ auto status = _gossiper.get_gossip_status(replace_host_id);
if (status == gms::versioned_value::STATUS_LEFT || status == gms::versioned_value::REMOVED_TOKEN) {
throw std::runtime_error(::format("Cannot replace_address {} because it has left the ring, status={}", replace_address, status));
}
@@ -3421,10 +3424,6 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
}
}
- if (!replace_host_id) {
- replace_host_id = _gossiper.get_host_id(replace_address);
- }
-
auto dc_rack = get_dc_rack_for(replace_host_id).value_or(locator::endpoint_dc_rack::default_location);
auto ri = replacement_info {
@@ -3452,16 +3451,17 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
}
st.endpoint = *res.begin();
}
- auto esp = _gossiper.get_endpoint_state_ptr(st.endpoint);
- if (!esp) {
+ auto host_id_opt = _gossiper.get_host_id_opt(st.endpoint);
+ if (!host_id_opt) {
throw std::runtime_error(::format("Ignore node {}/{} has no endpoint state", host_id, st.endpoint));
}
if (!host_id) {
- host_id = esp->get_host_id();
+ host_id = *host_id_opt;
if (!host_id) {
throw std::runtime_error(::format("Could not find host_id for ignored node {}", st.endpoint));
}
}
+ auto esp = _gossiper.get_endpoint_state_ptr(host_id);
st.tokens = esp->get_tokens();
st.opt_dc_rack = esp->get_dc_rack();
ri.ignore_nodes.emplace(host_id, std::move(st));
@@ -4242,9 +4242,9 @@ future<> storage_service::removenode(locator::host_id host_id, locator::host_id_
// Step 7: Announce the node has left
slogger.info("removenode[{}]: Advertising that the node left the ring", uuid);
- auto permit = ss._gossiper.lock_endpoint(endpoint, gms::null_permit_id).get();
+ auto permit = ss._gossiper.lock_endpoint(host_id, gms::null_permit_id).get();
const auto& pid =
permit.id();
- ss._gossiper.advertise_token_removed(endpoint, host_id, pid).get();
+ ss._gossiper.advertise_token_removed(host_id, pid).get();
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
ss.excise(std::move(tmp), endpoint, host_id, pid).get();
removed_from_token_ring = true;
@@ -5275,7 +5275,7 @@ storage_service::describe_ring_for_table(const sstring& keyspace_name, const sst
details._datacenter = node.dc_rack().dc;
details._rack = node.dc_rack().rack;
details._host = ip;
- tr._rpc_endpoints.push_back(_gossiper.get_rpc_address(ip));
+ tr._rpc_endpoints.push_back(_gossiper.get_rpc_address(r.host));
tr._endpoints.push_back(fmt::to_string(details._host));
tr._endpoint_details.push_back(std::move(details));
}
@@ -7449,7 +7449,7 @@ future<> endpoint_lifecycle_notifier::notify_up(gms::inet_address endpoint, loca
}
future<> storage_service::notify_up(inet_address endpoint, locator::host_id hid) {
- if (!_gossiper.is_cql_ready(endpoint) || !_gossiper.is_alive(hid)) {
+ if (!_gossiper.is_cql_ready(hid) || !_gossiper.is_alive(hid)) {
co_return;
}
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
@@ -7494,7 +7494,7 @@ future<> storage_service::notify_cql_change(inet_address endpoint, locator::host
}
}
-bool storage_service::is_normal_state_handled_on_boot(gms::inet_address node) {
+bool storage_service::is_normal_state_handled_on_boot(locator::host_id node) {
return _normal_state_handled_on_boot.contains(node);
}
@@ -7509,11 +7509,11 @@ future<> storage_service::wait_for_normal_state_handled_on_boot() {
slogger.info("Started waiting for normal state handlers to finish");
auto start_time = std::chrono::steady_clock::now();
- std::vector<gms::inet_address> eps;
+ std::vector<locator::host_id> eps;
while (true) {
eps = _gossiper.get_endpoints();
auto it = std::partition(eps.begin(), eps.end(),
- [this, me = get_broadcast_address()] (const gms::inet_address& ep) {
+ [this, me = my_host_id()] (const locator::host_id& ep) {
return ep == me || !_gossiper.is_normal_ring_member(ep) || is_normal_state_handled_on_boot(ep);
});
diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc
index dc4fe4510ae..a116fe33e48 100644
--- a/service/topology_coordinator.cc
+++ b/service/topology_coordinator.cc
@@ -775,7 +775,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_return;
});
std::string reason = ::format("Ban the orphan nodes in group0. Orphan nodes HostId/IP:");
- _gossiper.for_each_endpoint_state([&](const gms::inet_address& addr, const gms::endpoint_state& eps) -> void {
+ _gossiper.for_each_endpoint_state([&](const locator::host_id& addr, const gms::endpoint_state& eps) -> void {
// Since generation is in seconds unit, converting current time to seconds eases comparison computations.
auto current_timestamp =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
@@ -797,7 +797,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set("request_id", dummy_value)
.set("cleanup_status", cleanup_status::clean)
.set("node_state", node_state::left);
- reason.append(::format(" {}/{},", host_id, addr));
+ reason.append(::format(" {}/{},", host_id, eps.get_ip()));
updates.push_back({builder.build()});
}
});
diff --git a/transport/event_notifier.cc b/transport/event_notifier.cc
index 3d4001baa60..137925416c5 100644
--- a/transport/event_notifier.cc
+++ b/transport/event_notifier.cc
@@ -234,7 +234,7 @@ future<> cql_server::event_notifier::on_effective_service_levels_cache_reloaded(
void cql_server::event_notifier::on_join_cluster(const gms::inet_address& endpoint, locator::host_id hid)
{
- if (!_server._gossiper.is_cql_ready(endpoint)) {
+ if (!_server._gossiper.is_cql_ready(hid)) {
_endpoints_pending_joined_notification.insert(endpoint);
return;
}
--
2.47.1