---
gms/gossiper.cc | 48 ++++++++++++++++++++++++------------------------
1 file changed, 24 insertions(+), 24 deletions(-)
diff --git a/gms/gossiper.cc b/gms/gossiper.cc
index 6ed8e5938c9..db3fb4b4a88 100644
--- a/gms/gossiper.cc
+++ b/gms/gossiper.cc
@@ -147,8 +147,8 @@ void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) cons
utils::chunked_vector<gossip_digest> diff_digests;
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
- auto ep_state = this->get_endpoint_state_ptr(ep);
- version_type version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : version_type();
+ auto ep_state = get_endpoint_state_ptr(ep);
+ version_type version = ep_state ? get_max_endpoint_state_version(*ep_state) : version_type();
int32_t diff_version = ::abs((version - g_digest.get_max_version()).value());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version)));
}
@@ -170,7 +170,7 @@ void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) cons
future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
logger.trace("handle_syn_msg():from={},cluster_name:peer={},local={},group0_id:peer={},local={},partitioner_name:peer={},local={}",
from, syn_msg.cluster_id(), get_cluster_name(), syn_msg.group0_id(), get_group0_id(), syn_msg.partioner(), get_partitioner_name());
- if (!this->is_enabled()) {
+ if (!is_enabled()) {
co_return;
}
@@ -288,7 +288,7 @@ static bool should_count_as_msg_processing(const std::map<inet_address, endpoint
future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
logger.trace("handle_ack_msg():from={},msg={}", id, ack_msg);
- if (!this->is_enabled() && !this->is_in_shadow_round()) {
+ if (!is_enabled() && !is_in_shadow_round()) {
co_return;
}
@@ -307,13 +307,13 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
if (ep_state_map.size() > 0) {
update_timestamp_for_nodes(ep_state_map);
- co_await this->apply_state_locally(std::move(ep_state_map));
+ co_await apply_state_locally(std::move(ep_state_map));
}
auto from = id;
auto ack_msg_digest = std::move(g_digest_list);
- if (this->is_in_shadow_round()) {
- this->finish_shadow_round();
+ if (is_in_shadow_round()) {
+ finish_shadow_round();
// don't bother doing anything else, we have what we came for
co_return;
}
@@ -381,7 +381,7 @@ future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_
const auto version = es->get_heart_beat_state().get_generation() > g_digest.get_generation()
? version_type(0)
: g_digest.get_max_version();
- auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, version);
+ auto local_ep_state_ptr = get_state_for_version_bigger_than(addr, version);
if (local_ep_state_ptr) {
delta_ep_state_map.emplace(addr, *local_ep_state_ptr);
}
@@ -475,10 +475,10 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t>
co_return;
}
- auto permit = co_await this->lock_endpoint(from, null_permit_id);
+ auto permit = co_await lock_endpoint(from, null_permit_id);
if (generation_number_opt) {
debug_validate_gossip_generation(*generation_number_opt);
- auto es = this->get_endpoint_state_ptr(from);
+ auto es = get_endpoint_state_ptr(from);
if (es) {
auto local_generation = es->get_heart_beat_state().get_generation();
logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
@@ -494,7 +494,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t>
co_return;
}
}
- co_await this->mark_as_shutdown(from,
permit.id());
+ co_await mark_as_shutdown(from,
permit.id());
}
future<gossip_get_endpoint_states_response>
@@ -589,8 +589,8 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address>
future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state remote_state, bool listener_notification) {
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
- auto permit = co_await this->lock_endpoint(node, null_permit_id);
- auto es = this->get_endpoint_state_ptr(node);
+ auto permit = co_await lock_endpoint(node, null_permit_id);
+ auto es = get_endpoint_state_ptr(node);
if (!es && _topo_sm) {
// Even if there is no endpoint for the given IP the message can still belong to existing endpoint that
// was restarted with different IP, so lets try to locate the endpoint by host id as well. Do it in raft
@@ -615,7 +615,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
if (listener_notification) {
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
// major state change will handle the update by inserting the remote state directly
- co_await this->handle_major_state_change(node, std::move(remote_state),
permit.id());
+ co_await handle_major_state_change(node, std::move(remote_state),
permit.id());
} else {
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
co_await replicate(node, std::move(remote_state),
permit.id());
@@ -623,16 +623,16 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
} else if (remote_generation == local_generation) {
if (listener_notification) {
// find maximum state
- auto local_max_version = this->get_max_endpoint_state_version(local_state);
- auto remote_max_version = this->get_max_endpoint_state_version(remote_state);
+ auto local_max_version = get_max_endpoint_state_version(local_state);
+ auto remote_max_version = get_max_endpoint_state_version(remote_state);
if (remote_max_version > local_max_version) {
// apply states, but do not notify since there is no major change
- co_await this->apply_new_states(node, std::move(local_state), remote_state,
permit.id());
+ co_await apply_new_states(node, std::move(local_state), remote_state,
permit.id());
} else {
logger.debug("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
}
- if (!is_alive(node) && !this->is_dead_state(get_endpoint_state(node))) { // unless of course, it was dead
- this->mark_alive(node);
+ if (!is_alive(node) && !is_dead_state(get_endpoint_state(node))) { // unless of course, it was dead
+ mark_alive(node);
}
} else {
bool update = false;
@@ -660,7 +660,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
}
} else {
if (listener_notification) {
- co_await this->handle_major_state_change(node, std::move(remote_state),
permit.id());
+ co_await handle_major_state_change(node, std::move(remote_state),
permit.id());
} else {
logger.debug("Applying remote_state for node {} (new node)", node);
co_await replicate(node, std::move(remote_state),
permit.id());
@@ -683,7 +683,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
logger.debug("apply_state_locally_endpoints={}", endpoints);
co_await coroutine::parallel_for_each(endpoints, [this, &map] (auto&& ep) -> future<> {
- if (ep == this->get_broadcast_address() && !this->is_in_shadow_round()) {
+ if (ep == get_broadcast_address() && !is_in_shadow_round()) {
return make_ready_future<>();
}
if (_topo_sm) {
@@ -1101,7 +1101,7 @@ future<> gossiper::replicate_live_endpoints_on_change(foreign_ptr<std::unique_pt
void gossiper::run() {
// Run it in the background.
(void)seastar::with_semaphore(_callback_running, 1, [this] {
- return seastar::async([this, g = this->shared_from_this()] {
+ return seastar::async([this, g = shared_from_this()] {
logger.trace("=== Gossip round START");
//wait on messaging service to start listening
@@ -1117,7 +1117,7 @@ void gossiper::run() {
}
utils::chunked_vector<gossip_digest> g_digests;
- this->make_random_gossip_digest(g_digests);
+ make_random_gossip_digest(g_digests);
if (g_digests.size() > 0) {
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), g_digests, get_group0_id());
@@ -2127,7 +2127,7 @@ future<> gossiper::advertise_to_nodes(generation_for_nodes advertise_to_nodes) {
}
future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes, mandatory is_mandatory) {
- return seastar::async([this, g = this->shared_from_this(), nodes = std::move(nodes), is_mandatory] () mutable {
+ return seastar::async([this, g = shared_from_this(), nodes = std::move(nodes), is_mandatory] () mutable {
nodes.erase(get_broadcast_address());
gossip_get_endpoint_states_request request{{
gms::application_state::STATUS,
--
2.46.2