---
cdc/generation_service.hh | 4 ++--
gms/gossiper.hh | 2 +-
gms/i_endpoint_state_change_subscriber.hh | 16 +++++++-------
service/load_broadcaster.hh | 8 +++----
service/migration_manager.hh | 6 +++---
service/storage_service.hh | 12 +++++------
service/view_update_backlog_broker.hh | 4 ++--
streaming/stream_manager.hh | 6 +++---
cdc/generation.cc | 8 +++----
gms/endpoint_state.cc | 5 +++--
gms/feature_service.cc | 4 ++--
gms/gossiper.cc | 26 ++++++++++++-----------
repair/row_level.cc | 3 +++
service/migration_manager.cc | 8 +++----
service/misc_services.cc | 6 +++---
service/storage_service.cc | 24 ++++++++++-----------
streaming/stream_manager.cc | 6 +++---
17 files changed, 77 insertions(+), 71 deletions(-)
diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh
index 165a0aab1ec..d1f13573c52 100644
--- a/cdc/generation_service.hh
+++ b/cdc/generation_service.hh
@@ -110,8 +110,8 @@ class generation_service : public peering_sharded_service<generation_service>
return _cdc_metadata;
}
- virtual future<> on_join(gms::inet_address, gms::endpoint_state_ptr, gms::permit_id) override;
- virtual future<> on_change(gms::inet_address, const gms::application_state_map&, gms::permit_id) override;
+ virtual future<> on_join(gms::inet_address, locator::host_id id, gms::endpoint_state_ptr, gms::permit_id) override;
+ virtual future<> on_change(gms::inet_address, locator::host_id id, const gms::application_state_map&, gms::permit_id) override;
future<> check_and_repair_cdc_streams();
diff --git a/gms/gossiper.hh b/gms/gossiper.hh
index 62097f735ab..343882af0e2 100644
--- a/gms/gossiper.hh
+++ b/gms/gossiper.hh
@@ -540,7 +540,7 @@ class gossiper : public seastar::async_sharded_service<gossiper>, public seastar
// notify that an application state has changed
// Must be called under lock_endpoint.
- future<> do_on_change_notifications(inet_address addr, const application_state_map& states, permit_id) const;
+ future<> do_on_change_notifications(inet_address addr, locator::host_id id, const application_state_map& states, permit_id) const;
// notify that a node is DOWN (dead)
// Must be called under lock_endpoint.
diff --git a/gms/i_endpoint_state_change_subscriber.hh b/gms/i_endpoint_state_change_subscriber.hh
index 44e32bb5ce9..dd1f7874fba 100644
--- a/gms/i_endpoint_state_change_subscriber.hh
+++ b/gms/i_endpoint_state_change_subscriber.hh
@@ -34,8 +34,8 @@ namespace gms {
*/
class i_endpoint_state_change_subscriber {
protected:
- future<> on_application_state_change(inet_address endpoint, const application_state_map& states, application_state app_state, permit_id,
- std::function<future<>(inet_address, const gms::versioned_value&, gms::permit_id)> func);
+ future<> on_application_state_change(inet_address endpoint, locator::host_id id, const application_state_map& states, application_state app_state, permit_id,
+ std::function<future<>(inet_address, locator::host_id, const gms::versioned_value&, gms::permit_id)> func);
public:
virtual ~i_endpoint_state_change_subscriber() {}
@@ -46,15 +46,15 @@ class i_endpoint_state_change_subscriber {
* @param endpoint endpoint for which the state change occurred.
* @param epState state that actually changed for the above endpoint.
*/
- virtual future<> on_join(inet_address endpoint, endpoint_state_ptr ep_state, permit_id) { return make_ready_future<>(); }
+ virtual future<> on_join(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, permit_id) { return make_ready_future<>(); }
- virtual future<> on_change(inet_address endpoint, const application_state_map& states, permit_id) { return make_ready_future<>(); }
+ virtual future<> on_change(inet_address endpoint, locator::host_id id, const application_state_map& states, permit_id) { return make_ready_future<>(); }
- virtual future<> on_alive(inet_address endpoint, endpoint_state_ptr state, permit_id) { return make_ready_future<>(); };
+ virtual future<> on_alive(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, permit_id) { return make_ready_future<>(); };
- virtual future<> on_dead(inet_address endpoint, endpoint_state_ptr state, permit_id) { return make_ready_future<>(); };
+ virtual future<> on_dead(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, permit_id) { return make_ready_future<>(); };
- virtual future<> on_remove(inet_address endpoint, permit_id) { return make_ready_future<>(); };
+ virtual future<> on_remove(inet_address endpoint, locator::host_id id, permit_id) { return make_ready_future<>(); };
/**
* Called whenever a node is restarted.
@@ -62,7 +62,7 @@ class i_endpoint_state_change_subscriber {
* previously marked down. It will have only if {@code state.isAlive() == false}
* as {@code state} is from before the restarted node is marked up.
*/
- virtual future<> on_restart(inet_address endpoint, endpoint_state_ptr state, permit_id) { return make_ready_future<>(); };
+ virtual future<> on_restart(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, permit_id) { return make_ready_future<>(); };
};
} // namespace gms
diff --git a/service/load_broadcaster.hh b/service/load_broadcaster.hh
index 2f13ae4b5ed..4164a73afc8 100644
--- a/service/load_broadcaster.hh
+++ b/service/load_broadcaster.hh
@@ -37,14 +37,14 @@ class load_broadcaster : public gms::i_endpoint_state_change_subscriber, public
SCYLLA_ASSERT(_stopped);
}
- virtual future<> on_change(gms::inet_address endpoint, const gms::application_state_map& states, gms::permit_id pid) override {
- return on_application_state_change(endpoint, states, gms::application_state::LOAD, pid, [this] (gms::inet_address endpoint, const gms::versioned_value& value, gms::permit_id) {
+ virtual future<> on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) override {
+ return on_application_state_change(endpoint, id, states, gms::application_state::LOAD, pid, [this] (gms::inet_address endpoint, locator::host_id id, const gms::versioned_value& value, gms::permit_id) {
_load_info[endpoint] = std::stod(value.value());
return make_ready_future<>();
});
}
- virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id pid) override {
+ virtual future<> on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id pid) override {
auto* local_value = ep_state->get_application_state_ptr(gms::application_state::LOAD);
if (local_value) {
_load_info[endpoint] = std::stod(local_value->value());
@@ -52,7 +52,7 @@ class load_broadcaster : public gms::i_endpoint_state_change_subscriber, public
return make_ready_future();
}
- virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override {
+ virtual future<> on_remove(gms::inet_address endpoint, locator::host_id id, gms::permit_id) override {
_load_info.erase(endpoint);
return make_ready_future();
}
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index 50ea39eb5f9..4f9b2651229 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -185,9 +185,9 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
future<schema_ptr> get_schema_for_write(table_schema_version, locator::host_id from, unsigned shard, netw::messaging_service& ms, abort_source& as);
private:
- virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
- virtual future<> on_change(gms::inet_address endpoint, const gms::application_state_map& states, gms::permit_id) override;
- virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override;
+ virtual future<> on_join(gms::inet_address endpoint,locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
+ virtual future<> on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id) override;
+ virtual future<> on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
public:
// For tests only.
diff --git a/service/storage_service.hh b/service/storage_service.hh
index 0f6e17775f2..409a07723ba 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -467,7 +467,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
future<std::map<token, inet_address>> get_tablet_to_endpoint_map(table_id table);
public:
- virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
+ virtual future<> on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
/*
* Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
* ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
@@ -496,11 +496,11 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
* you should never bootstrap a new node during a removenode, decommission or move.
*/
- virtual future<> on_change(gms::inet_address endpoint, const gms::application_state_map& states, gms::permit_id) override;
- virtual future<> on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override;
- virtual future<> on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override;
- virtual future<> on_remove(gms::inet_address endpoint, gms::permit_id) override;
- virtual future<> on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) override;
+ virtual future<> on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id) override;
+ virtual future<> on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
+ virtual future<> on_dead(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
+ virtual future<> on_remove(gms::inet_address endpoint, locator::host_id id, gms::permit_id) override;
+ virtual future<> on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
public:
// For migration_listener
diff --git a/service/view_update_backlog_broker.hh b/service/view_update_backlog_broker.hh
index 7d54ffb055d..65e99274ec6 100644
--- a/service/view_update_backlog_broker.hh
+++ b/service/view_update_backlog_broker.hh
@@ -39,9 +39,9 @@ class view_update_backlog_broker final
seastar::future<> stop();
- virtual future<> on_change(gms::inet_address, const gms::application_state_map& states, gms::permit_id) override;
+ virtual future<> on_change(gms::inet_address, locator::host_id id, const gms::application_state_map& states, gms::permit_id) override;
- virtual future<> on_remove(gms::inet_address, gms::permit_id) override;
+ virtual future<> on_remove(gms::inet_address, locator::host_id id, gms::permit_id) override;
};
}
diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh
index 927c3b24170..6bd589f39e2 100644
--- a/streaming/stream_manager.hh
+++ b/streaming/stream_manager.hh
@@ -172,9 +172,9 @@ class stream_manager : public gms::i_endpoint_state_change_subscriber, public en
reader_consumer_v2 make_streaming_consumer(
uint64_t estimated_partitions, stream_reason, service::frozen_topology_guard);
public:
- virtual future<> on_dead(inet_address endpoint, endpoint_state_ptr state, gms::permit_id) override;
- virtual future<> on_remove(inet_address endpoint, gms::permit_id) override;
- virtual future<> on_restart(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) override;
+ virtual future<> on_dead(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override;
+ virtual future<> on_remove(inet_address endpoint, locator::host_id id, gms::permit_id) override;
+ virtual future<> on_restart(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, gms::permit_id) override;
private:
void fail_all_sessions();
diff --git a/cdc/generation.cc b/cdc/generation.cc
index bfdfa73555d..518da60e7c4 100644
--- a/cdc/generation.cc
+++ b/cdc/generation.cc
@@ -841,18 +841,18 @@ future<> generation_service::leave_ring() {
co_await _gossiper.unregister_(shared_from_this());
}
-future<> generation_service::on_join(gms::inet_address ep, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
- return on_change(ep, ep_state->get_application_state_map(), pid);
+future<> generation_service::on_join(gms::inet_address ep, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
+ return on_change(ep, id, ep_state->get_application_state_map(), pid);
}
-future<> generation_service::on_change(gms::inet_address ep, const gms::application_state_map& states, gms::permit_id pid) {
+future<> generation_service::on_change(gms::inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
assert_shard_zero(__PRETTY_FUNCTION__);
if (_raft_topology_change_enabled()) {
return make_ready_future<>();
}
- return on_application_state_change(ep, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, const gms::versioned_value& v, gms::permit_id) {
+ return on_application_state_change(ep, id, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, locator::host_id id, const gms::versioned_value& v, gms::permit_id) {
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value());
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
diff --git a/gms/endpoint_state.cc b/gms/endpoint_state.cc
index 5c9ef4b90ed..0770f239afd 100644
--- a/gms/endpoint_state.cc
+++ b/gms/endpoint_state.cc
@@ -80,11 +80,12 @@ std::unordered_set<dht::token> endpoint_state::get_tokens() const {
}
future<> i_endpoint_state_change_subscriber::on_application_state_change(inet_address endpoint,
+ locator::host_id id,
const gms::application_state_map& states, application_state app_state, permit_id pid,
- std::function<future<>(inet_address, const gms::versioned_value&, permit_id)> func) {
+ std::function<future<>(inet_address, locator::host_id, const gms::versioned_value&, permit_id)> func) {
auto it = states.find(app_state);
if (it != states.end()) {
- return func(endpoint, it->second, pid);
+ return func(endpoint, id, it->second, pid);
}
return make_ready_future<>();
}
diff --git a/gms/feature_service.cc b/gms/feature_service.cc
index eaa466f8741..52044722147 100644
--- a/gms/feature_service.cc
+++ b/gms/feature_service.cc
@@ -262,10 +262,10 @@ class persistent_feature_enabler : public i_endpoint_state_change_subscriber {
, _ss(ss)
{
}
- future<> on_join(inet_address ep, endpoint_state_ptr state, gms::permit_id) override {
+ future<> on_join(inet_address ep, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override {
return enable_features();
}
- future<> on_change(inet_address ep, const gms::application_state_map& states, gms::permit_id pid) override {
+ future<> on_change(inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) override {
if (states.contains(application_state::SUPPORTED_FEATURES)) {
return enable_features();
}
diff --git a/gms/gossiper.cc b/gms/gossiper.cc
index 2a0e96b6e82..21de6589721 100644
--- a/gms/gossiper.cc
+++ b/gms/gossiper.cc
@@ -704,10 +704,12 @@ future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) {
auto permit = co_await lock_endpoint(endpoint, pid);
pid =
permit.id();
+ auto state = get_endpoint_state_ptr(endpoint);
+
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
try {
- co_await _subscribers.for_each([endpoint, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_remove(endpoint, pid);
+ co_await _subscribers.for_each([endpoint, state, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
+ return subscriber->on_remove(endpoint, state ? state->get_host_id() : locator::host_id{}, pid);
});
} catch (...) {
logger.warn("Fail to call on_remove callback: {}", std::current_exception());
@@ -719,8 +721,6 @@ future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) {
logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds);
}
- auto state = get_endpoint_state_ptr(endpoint);
-
if (!state) {
logger.warn("There is no state for the removed IP {}", endpoint);
co_return;
@@ -1770,7 +1770,7 @@ future<> gossiper::real_mark_alive(inet_address addr) {
logger.info("InetAddress {}/{} is now UP, status = {}", es->get_host_id(), addr, status);
co_await _subscribers.for_each([addr, es, pid =
permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
- co_await subscriber->on_alive(addr, es, pid);
+ co_await subscriber->on_alive(addr, es->get_host_id(), es, pid);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
@@ -1813,7 +1813,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, endpoint_state eps
if (eps_old) {
// the node restarted: it is up to the subscriber to take whatever action is necessary
co_await _subscribers.for_each([ep, eps_old, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_restart(ep, eps_old, pid);
+ return subscriber->on_restart(ep, eps_old->get_host_id(), eps_old, pid);
});
}
@@ -1829,7 +1829,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, endpoint_state eps
}
co_await _subscribers.for_each([ep, ep_state, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_join(ep, ep_state, pid);
+ return subscriber->on_join(ep, ep_state->get_host_id(), ep_state, pid);
});
// check this at the end so nodes will learn about the endpoint
@@ -1910,6 +1910,8 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state local_stat
ep = std::current_exception();
}
+ auto host_id = local_state.get_host_id();
+
// We must replicate endpoint states before listeners run.
// Exceptions during replication will cause abort because node's state
// would be inconsistent across shards. Changes listeners depend on state
@@ -1925,7 +1927,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state local_stat
// Some values are set only once, so listeners would never be re-run.
// Listeners should decide which failures are non-fatal and swallow them.
try {
- co_await do_on_change_notifications(addr, changed, pid);
+ co_await do_on_change_notifications(addr, host_id, changed, pid);
} catch (...) {
auto msg = format("Gossip change listener failed: {}", std::current_exception());
if (_abort_source.abort_requested()) {
@@ -1938,18 +1940,18 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state local_stat
maybe_rethrow_exception(std::move(ep));
}
-future<> gossiper::do_on_change_notifications(inet_address addr, const gms::application_state_map& states, permit_id pid) const {
+future<> gossiper::do_on_change_notifications(inet_address addr, locator::host_id id, const gms::application_state_map& states, permit_id pid) const {
co_await _subscribers.for_each([&] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
// Once _abort_source is aborted, don't attempt to process any further notifications
// because that would violate monotonicity due to partially failed notification.
_abort_source.check();
- return subscriber->on_change(addr, states, pid);
+ return subscriber->on_change(addr, id, states, pid);
});
}
future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) const {
co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
- return subscriber->on_dead(addr, state, pid);
+ return subscriber->on_dead(addr, state->get_host_id(), state, pid);
});
}
@@ -2271,7 +2273,7 @@ future<> gossiper::add_local_application_state(application_state_map states) {
// now we might defer again, so this could be reordered. But we've
// ensured the whole set of values are monotonically versioned and
// applied to endpoint state.
- co_await gossiper.do_on_change_notifications(ep_addr, states,
permit.id());
+ co_await gossiper.do_on_change_notifications(ep_addr, gossiper.my_host_id(), states,
permit.id());
});
} catch (...) {
logger.warn("Fail to apply application_state: {}", std::current_exception());
diff --git a/repair/row_level.cc b/repair/row_level.cc
index b562c708dfe..7dccf992920 100644
--- a/repair/row_level.cc
+++ b/repair/row_level.cc
@@ -3203,17 +3203,20 @@ class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subsc
}
virtual future<> on_dead(
gms::inet_address endpoint,
+ locator::host_id id,
gms::endpoint_state_ptr state,
gms::permit_id) override {
return remove_row_level_repair(_repair_service.get_gossiper().get_host_id(endpoint));
}
virtual future<> on_remove(
gms::inet_address endpoint,
+ locator::host_id id,
gms::permit_id) override {
return remove_row_level_repair(_repair_service.get_gossiper().get_host_id(endpoint));
}
virtual future<> on_restart(
gms::inet_address endpoint,
+ locator::host_id id,
gms::endpoint_state_ptr ep_state,
gms::permit_id) override {
return remove_row_level_repair(_repair_service.get_gossiper().get_host_id(endpoint));
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index 67a0c4d5a44..9da50bf51f0 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -1151,13 +1151,13 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id
return db::schema_tables::get_column_mapping(sys_ks, table_id, v);
}
-future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id) {
+future<> migration_manager::on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) {
schedule_schema_pull(ep_state->get_host_id(), *ep_state);
return make_ready_future();
}
-future<> migration_manager::on_change(gms::inet_address endpoint, const gms::application_state_map& states, gms::permit_id pid) {
- return on_application_state_change(endpoint, states, gms::application_state::SCHEMA, pid, [this] (gms::inet_address endpoint, const gms::versioned_value&, gms::permit_id) {
+future<> migration_manager::on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
+ return on_application_state_change(endpoint, id, states, gms::application_state::SCHEMA, pid, [this] (gms::inet_address endpoint, locator::host_id id, const gms::versioned_value&, gms::permit_id) {
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
@@ -1172,7 +1172,7 @@ future<> migration_manager::on_change(gms::inet_address endpoint, const gms::app
});
}
-future<> migration_manager::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id) {
+future<> migration_manager::on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) {
schedule_schema_pull(state->get_host_id(), *state);
return make_ready_future();
}
diff --git a/service/misc_services.cc b/service/misc_services.cc
index a2e6d558c33..691f1a4b3d8 100644
--- a/service/misc_services.cc
+++ b/service/misc_services.cc
@@ -266,8 +266,8 @@ future<> view_update_backlog_broker::stop() {
});
}
-future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, const gms::application_state_map& states, gms::permit_id pid) {
- return on_application_state_change(endpoint, states, gms::application_state::VIEW_BACKLOG, pid, [this] (gms::inet_address endpoint, const gms::versioned_value& value, gms::permit_id) {
+future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
+ return on_application_state_change(endpoint, id, states, gms::application_state::VIEW_BACKLOG, pid, [this] (gms::inet_address endpoint, locator::host_id id, const gms::versioned_value& value, gms::permit_id) {
if (utils::get_local_injector().enter("skip_updating_local_backlog_via_view_update_backlog_broker")) {
return make_ready_future<>();
}
@@ -304,7 +304,7 @@ future<> view_update_backlog_broker::on_change(gms::inet_address endpoint, const
});
}
-future<> view_update_backlog_broker::on_remove(gms::inet_address endpoint, gms::permit_id) {
+future<> view_update_backlog_broker::on_remove(gms::inet_address endpoint, locator::host_id id, gms::permit_id) {
_sp.local()._view_update_backlogs.erase(_gossiper.get_host_id(endpoint));
return make_ready_future();
}
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 060bdac5ac4..fa1a706196a 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -972,17 +972,17 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
{}
virtual future<>
- on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
+ on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, ep_state, permit_id, "on_join");
}
virtual future<>
- on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
+ on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, ep_state, permit_id, "on_alive");
}
virtual future<>
- on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
+ on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id permit_id) override {
return on_endpoint_change(endpoint, ep_state, permit_id, "on_restart");
}
};
@@ -2580,12 +2580,12 @@ future<> storage_service::handle_state_removed(inet_address endpoint, std::vecto
}
}
-future<> storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
+future<> storage_service::on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id pid) {
slogger.debug("endpoint={} on_join: permit_id={}", endpoint, pid);
- co_await on_change(endpoint, ep_state->get_application_state_map(), pid);
+ co_await on_change(endpoint, id, ep_state->get_application_state_map(), pid);
}
-future<> storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id pid) {
+future<> storage_service::on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
const auto& tm = get_token_metadata();
const auto host_id = state->get_host_id();
slogger.debug("endpoint={}/{} on_alive: permit_id={}", endpoint, host_id, pid);
@@ -2612,14 +2612,14 @@ future<std::optional<gms::inet_address>> storage_service::get_ip_from_peers_tabl
co_return std::nullopt;
}
-future<> storage_service::on_change(gms::inet_address endpoint, const gms::application_state_map& states_, gms::permit_id pid) {
+future<> storage_service::on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states_, gms::permit_id pid) {
// copy the states map locally since the coroutine may yield
auto states = states_;
slogger.debug("endpoint={} on_change: states={}, permit_id={}", endpoint, states, pid);
if (raft_topology_change_enabled()) {
slogger.debug("ignore status changes since topology changes are using raft");
} else {
- co_await on_application_state_change(endpoint, states, application_state::STATUS, pid, [this] (inet_address endpoint, const gms::versioned_value& value, gms::permit_id pid) -> future<> {
+ co_await on_application_state_change(endpoint, id, states, application_state::STATUS, pid, [this] (inet_address endpoint, locator::host_id id, const gms::versioned_value& value, gms::permit_id pid) -> future<> {
std::vector<sstring> pieces;
boost::split(pieces, value.value(), boost::is_any_of(versioned_value::DELIMITER));
if (pieces.empty()) {
@@ -2688,7 +2688,7 @@ future<> storage_service::maybe_reconnect_to_preferred_ip(inet_address ep, inet_
}
-future<> storage_service::on_remove(gms::inet_address endpoint, gms::permit_id pid) {
+future<> storage_service::on_remove(gms::inet_address endpoint, locator::host_id id, gms::permit_id pid) {
slogger.debug("endpoint={} on_remove: permit_id={}", endpoint, pid);
if (raft_topology_change_enabled()) {
@@ -2720,16 +2720,16 @@ future<> storage_service::on_remove(gms::inet_address endpoint, gms::permit_id p
co_await replicate_to_all_cores(std::move(tmptr));
}
-future<> storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id pid) {
+future<> storage_service::on_dead(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
slogger.debug("endpoint={} on_dead: permit_id={}", endpoint, pid);
return notify_down(endpoint);
}
-future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state_ptr state, gms::permit_id pid) {
+future<> storage_service::on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id pid) {
slogger.debug("endpoint={} on_restart: permit_id={}", endpoint, pid);
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (endpoint != get_broadcast_address() && _gossiper.is_alive(state->get_host_id())) {
- return on_dead(endpoint, state, pid);
+ return on_dead(endpoint, id, state, pid);
}
return make_ready_future();
}
diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc
index 30681eb3714..4a31781deeb 100644
--- a/streaming/stream_manager.cc
+++ b/streaming/stream_manager.cc
@@ -346,7 +346,7 @@ void stream_manager::fail_all_sessions() {
}
}
-future<> stream_manager::on_remove(inet_address endpoint, gms::permit_id) {
+future<> stream_manager::on_remove(inet_address endpoint, locator::host_id id, gms::permit_id) {
if (has_peer(endpoint)) {
sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint);
//FIXME: discarded future.
@@ -359,7 +359,7 @@ future<> stream_manager::on_remove(inet_address endpoint, gms::permit_id) {
return make_ready_future();
}
-future<> stream_manager::on_restart(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) {
+future<> stream_manager::on_restart(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, gms::permit_id) {
if (has_peer(endpoint)) {
sslog.info("stream_manager: Close all stream_session with peer = {} in on_restart", endpoint);
//FIXME: discarded future.
@@ -372,7 +372,7 @@ future<> stream_manager::on_restart(inet_address endpoint, endpoint_state_ptr ep
return make_ready_future();
}
-future<> stream_manager::on_dead(inet_address endpoint, endpoint_state_ptr ep_state, gms::permit_id) {
+future<> stream_manager::on_dead(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, gms::permit_id) {
if (has_peer(endpoint)) {
sslog.info("stream_manager: Close all stream_session with peer = {} in on_dead", endpoint);
//FIXME: discarded future.
--
2.47.1