[PATCH v1 0/8] Load peers table into the gossiper on boot

6 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:29 AMJan 5
to scylladb-dev@googlegroups.com
Since we manage ip to id mapping directly in gossiper now we need to
load the mapping on boot. We already do it anyway, but only due to a bug
which checks raft topology mode config before it is set, so the code
thinks that it is in the gossiper mode and loads peers table into the
gossiper and token metadata. Fix the bug and load peers into the gossiper
only since token metadata is managed by raft.

The series also removes address map related test that no longer checks
anything and replace it with unit test.

It also adds the dc/rack check to "join node" rpc. The check is done
during shadow round now, but for it to work it requires dc/rack to be
propagated through the gossiper and we want to eventually drop it.


CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/14193/

Also in scylla-dev gleb/load-peers

Gleb Natapov (8):
test: address_map: check generation handling during entry addition
test: drop test_old_ip_notification_repro.py
locator: drop inet_address usage to figure out per dc/rack replication
storage_service: set raft topology change mode before using it in
join_cluster
storage_service: load peers into gossiper on boot in raft topology
mode
storage_service: do not add endpoint to the gossiper during topology
loading.
gossiper: fix the logic of shadow_round parameter
topology coordinator: reject replace request if topology does not
match

.../test_old_ip_notification_repro.py | 53 ------
service/storage_service.hh | 2 +-
gms/gossiper.cc | 34 +---
locator/network_topology_strategy.cc | 8 +-
service/storage_service.cc | 160 ++++++++----------
test/boost/address_map_test.cc | 17 ++
6 files changed, 104 insertions(+), 170 deletions(-)
delete mode 100644 test/topology_custom/test_old_ip_notification_repro.py

--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:31 AMJan 5
to scylladb-dev@googlegroups.com
Check that adding an entry with smaller generation does not overwrite
existing entry.
---
test/boost/address_map_test.cc | 17 +++++++++++++++++
1 file changed, 17 insertions(+)

diff --git a/test/boost/address_map_test.cc b/test/boost/address_map_test.cc
index f8b0fb6da4b..7594a2cd088 100644
--- a/test/boost/address_map_test.cc
+++ b/test/boost/address_map_test.cc
@@ -246,6 +246,23 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_operations) {
scoped_no_abort_on_internal_error abort_guard;
BOOST_CHECK_THROW(m.find_by_addr(gms::inet_address{}), std::runtime_error);
}
+ {
+ // Check that an update with smaller generation will not overwrite update with larger one
+ // but other way around works
+ sharded<address_map_t<manual_clock>> m_svc;
+ m_svc.start().get();
+ auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
+ auto& m = m_svc.local();
+
+ m.add_or_update_entry(id1, addr1, gms::generation_type{2});
+ m.add_or_update_entry(id1, addr2, gms::generation_type{1});
+
+ BOOST_CHECK(m.find(id1).value() == addr1);
+
+ m.add_or_update_entry(id1, addr2, gms::generation_type{3});
+
+ BOOST_CHECK(m.find(id1).value() == addr2);
+ }
}

SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:32 AMJan 5
to scylladb-dev@googlegroups.com
As removed comment says it was done because storage_service::join_cluster
did not load gossiper endpoint but now it does.
---
service/storage_service.cc | 36 +-----------------------------------
1 file changed, 1 insertion(+), 35 deletions(-)

diff --git a/service/storage_service.cc b/service/storage_service.cc
index 6725d21997e..78de47f1c82 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -789,45 +789,11 @@ future<> storage_service::topology_state_load(state_change_hint hint) {

co_await update_fence_version(_topology_state_machine._topology.fence_version);

- // We don't load gossiper endpoint states in storage_service::join_cluster
- // if raft_topology_change_enabled(). On the other hand gossiper is still needed
- // even in case of raft_topology_change_enabled() mode, since it still contains part
- // of the cluster state. To work correctly, the gossiper needs to know the current
- // endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds
- // will be up to date and reachable at the time of restart.
- const auto tmptr = get_token_metadata_ptr();
- for (const auto& node : tmptr->get_topology().get_nodes()) {
- const auto& host_id = node.get().host_id();
- const auto& ep = node.get().endpoint();
- if (is_me(host_id)) {
- continue;
- }
- if (ep == inet_address{}) {
- continue;
- }
- auto permit = co_await _gossiper.lock_endpoint(ep, gms::null_permit_id);
- // Add the endpoint if it doesn't exist yet in gossip
- // since it is not loaded in join_cluster in the
- // raft_topology_change_enabled() case.
- if (!_gossiper.get_endpoint_state_ptr(ep)) {
- gms::loaded_endpoint_state st;
- st.endpoint = ep;
- st.tokens = tmptr->get_tokens(host_id) | std::ranges::to<std::unordered_set<dht::token>>();
- st.opt_dc_rack = node.get().dc_rack();
- // Save tokens, not needed for raft topology management, but needed by legacy
- // Also ip -> id mapping is needed for address map recreation on reboot
- if (node.get().is_this_node() && !st.tokens.empty()) {
- st.opt_status = gms::versioned_value::normal(st.tokens);
- }
- co_await _gossiper.add_saved_endpoint(host_id, std::move(st), permit.id());
- }
- }
-
// As soon as a node joins token_metadata.topology we
// need to drop all its rpc connections with ignored_topology flag.
{
std::vector<future<>> futures;
- tmptr->get_topology().for_each_node([&](const locator::node& n) {
+ get_token_metadata_ptr()->get_topology().for_each_node([&](const locator::node& n) {
const auto ep = n.endpoint();
if (ep != inet_address{} && !saved_tmpr->get_topology().has_endpoint(ep)) {
futures.push_back(remove_rpc_client_with_ignored_topology(ep, n.host_id()));
--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:32 AMJan 5
to scylladb-dev@googlegroups.com
The test no longer test anything since the address map is updated much
earlier now by the gossiper itself, not by the notifiers. The
functionality is tested by a unit test now.
---
.../test_old_ip_notification_repro.py | 53 -------------------
gms/gossiper.cc | 18 -------
2 files changed, 71 deletions(-)
delete mode 100644 test/topology_custom/test_old_ip_notification_repro.py

diff --git a/test/topology_custom/test_old_ip_notification_repro.py b/test/topology_custom/test_old_ip_notification_repro.py
deleted file mode 100644
index 3a1084d6734..00000000000
--- a/test/topology_custom/test_old_ip_notification_repro.py
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Copyright (C) 2023-present ScyllaDB
-#
-# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
-#
-
-import logging
-import time
-import pytest
-
-from test.pylib.manager_client import ManagerClient
-from test.pylib.rest_client import inject_error, read_barrier
-from test.pylib.util import wait_for_cql_and_get_hosts
-from test.topology.conftest import skip_mode
-
-
-logger = logging.getLogger(__name__)
-
-
-...@pytest.mark.asyncio
-@skip_mode('release', 'error injections are not supported in release mode')
-async def test_old_ip_notification_repro(manager: ManagerClient) -> None:
- """
- Regression test for #14257.
- It starts two nodes. It introduces a sleep in gossiper::real_mark_alive
- when receiving a gossip notification about
- HOST_ID update from the second node. Then it restarts the second node with
- a different IP. Due to the sleep, the old notification from the old IP arrives
- after the second node has restarted. If the bug is present, this notification
- overrides the address map entry and the second read barrier times out, since
- the first node cannot reach the second node with the old IP.
- """
- s1 = await manager.server_add()
- s2 = await manager.server_add(start=False)
- async with inject_error(manager.api, s1.ip_addr, 'gossiper::real_mark_alive',
- parameters={ "second_node_ip": s2.ip_addr }) as handler:
- # This injection delays the gossip notification from the initial IP of s2.
- logger.info(f"Starting {s2}")
- await manager.server_start(s2.server_id)
- logger.info(f"Stopping {s2}")
- await manager.server_stop_gracefully(s2.server_id)
- await manager.server_change_ip(s2.server_id)
- logger.info(f"Starting {s2}")
- await manager.server_start(s2.server_id)
- logger.info(f"Wait for cql")
- await manager.get_ready_cql([s1])
- logger.info(f"Read barrier")
- await read_barrier(manager.api, s1.ip_addr) # Wait for s1 to be aware of s2 with the new IP.
- await handler.message() # s1 receives the gossip notification from the initial IP of s2.
- logger.info(f"Read barrier")
- # If IP of s2 is overridden by its initial IP, the read barrier should time out.
- await read_barrier(manager.api, s1.ip_addr)
- logger.info(f"Done")
diff --git a/gms/gossiper.cc b/gms/gossiper.cc
index f9ab541fae7..c19b2db0cda 100644
--- a/gms/gossiper.cc
+++ b/gms/gossiper.cc
@@ -1730,24 +1730,6 @@ void gossiper::mark_alive(inet_address addr) {
}

future<> gossiper::real_mark_alive(inet_address addr) {
- co_await utils::get_local_injector().inject("gossiper::real_mark_alive", [this, endpoint = addr] (auto& handler) -> future<> {
- auto app_state_ptr = get_application_state_ptr(endpoint, application_state::HOST_ID);
- if (!app_state_ptr) {
- co_return;
- }
-
- locator::host_id id(utils::UUID(app_state_ptr->value()));
- auto second_node_ip = handler.get("second_node_ip");
- SCYLLA_ASSERT(second_node_ip);
-
- logger.info("real_mark_alive {}/{} second_node_ip={}", id, endpoint, *second_node_ip);
- if (endpoint == gms::inet_address(sstring{*second_node_ip})) {
- logger.info("Sleeping before real_mark_alive for {}/{}", id, endpoint);
- co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{1});
- logger.info("Finished sleeping before real_mark_alive for {}/{}", id, endpoint);
- }
- });
-
auto permit = co_await lock_endpoint(addr, null_permit_id);

// After sending echo message, the Node might not be in the
--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:33 AMJan 5
to scylladb-dev@googlegroups.com
Gossiper manages address map now, so load peers table into the gossiper
on reboot to be able to map ids to ips as early as possible.
---
service/storage_service.hh | 2 +-
service/storage_service.cc | 92 ++++++++++++++++++++++----------------
2 files changed, 54 insertions(+), 40 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index 2b657985285..ed5d102ff58 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -532,7 +532,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
private:
- db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint);
+ std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint);
// return an engaged value iff app_state_map has changes to the peer info
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);

diff --git a/service/storage_service.cc b/service/storage_service.cc
index 7468d91b048..6725d21997e 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -511,18 +511,20 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
// add one.
const auto& host_id_to_ip_map = *(co_await get_host_id_to_ip_map());

- // Some state that is used to fill in 'peeers' table is still propagated over gossiper.
+ // Some state that is used to fill in 'peers' table is still propagated over gossiper.
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
auto info = get_peer_info_for_update(*ip);
- // And then amend with the info from raft
- info.tokens = rs.ring.value().tokens;
- info.data_center = rs.datacenter;
- info.rack = rs.rack;
- info.release_version = rs.release_version;
- info.supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
- sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, info));
+ if (info) {
+ // And then amend with the info from raft
+ info->tokens = rs.ring.value().tokens;
+ info->data_center = rs.datacenter;
+ info->rack = rs.rack;
+ info->release_version = rs.release_version;
+ info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
+ sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, *info));
+ }
if (!prev_normal.contains(id)) {
nodes_to_notify.joined.push_back(*ip);
}
@@ -2550,7 +2552,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} endpoint_to_remove={} owned_tokens={}", endpoint, is_normal_token_owner, endpoints_to_remove.contains(endpoint), owned_tokens);
if (!is_me(endpoint) && !owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
try {
- auto info = get_peer_info_for_update(endpoint);
+ auto info = get_peer_info_for_update(endpoint).value();
info.tokens = std::move(owned_tokens);
co_await _sys_ks.local().update_peer_info(endpoint, host_id, info);
} catch (...) {
@@ -2761,17 +2763,16 @@ future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_s
return make_ready_future();
}

-db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint) {
+std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint) {
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
return db::system_keyspace::peer_info{};
}
auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
- if (!info) {
+ if (!info && !raft_topology_change_enabled()) {
on_internal_error_noexcept(slogger, seastar::format("get_peer_info_for_update({}): application state has no peer info: {}", endpoint, ep_state->get_application_state_map()));
- return db::system_keyspace::peer_info{};
}
- return *info;
+ return info;
}

std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) {
@@ -3007,35 +3008,48 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
// We must allow restarts of zero-token nodes in the gossip-based topology due to the recovery mode.
}

- if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) {
- slogger.info("Loading persisted ring state");
-
- auto tmlock = co_await get_token_metadata_lock();
- auto tmptr = co_await get_mutable_token_metadata_ptr();
- for (auto& [host_id, st] : loaded_endpoints) {
- if (st.endpoint == get_broadcast_address()) {
- // entry has been mistakenly added, delete it
- slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint);
- co_await _sys_ks.local().remove_endpoint(st.endpoint);
- } else {
- if (host_id == my_host_id()) {
- on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
- }
- if (!st.opt_dc_rack) {
- st.opt_dc_rack = locator::endpoint_dc_rack::default_location;
- slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
+ if (!raft_topology_change_enabled()) {
+ if (_db.local().get_config().load_ring_state()) {
+ slogger.info("Loading persisted ring state");
+
+ auto tmlock = co_await get_token_metadata_lock();
+ auto tmptr = co_await get_mutable_token_metadata_ptr();
+ for (auto& [host_id, st] : loaded_endpoints) {
+ if (st.endpoint == get_broadcast_address()) {
+ // entry has been mistakenly added, delete it
+ slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint);
+ co_await _sys_ks.local().remove_endpoint(st.endpoint);
+ } else {
+ if (host_id == my_host_id()) {
+ on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
+ }
+ if (!st.opt_dc_rack) {
+ st.opt_dc_rack = locator::endpoint_dc_rack::default_location;
+ slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
+ }
+ const auto& dc_rack = *st.opt_dc_rack;
+ slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens);
+ tmptr->update_topology(host_id, dc_rack, locator::node::state::normal);
+ co_await tmptr->update_normal_tokens(st.tokens, host_id);
+ tmptr->update_host_id(host_id, st.endpoint);
+ // gossiping hasn't started yet
+ // so no need to lock the endpoint
+ co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
- const auto& dc_rack = *st.opt_dc_rack;
- slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens);
- tmptr->update_topology(host_id, dc_rack, locator::node::state::normal);
- co_await tmptr->update_normal_tokens(st.tokens, host_id);
- tmptr->update_host_id(host_id, st.endpoint);
- // gossiping hasn't started yet
- // so no need to lock the endpoint
- co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
}
+ co_await replicate_to_all_cores(std::move(tmptr));
}
- co_await replicate_to_all_cores(std::move(tmptr));
+ } else {
+ slogger.info("Loading persisted peers into the gossiper");
+ // If topology coordinator is enabled only load peers into the gossiper (since it is were ID to IP maopping is managed)
+ // No need to update topology.
+ co_await coroutine::parallel_for_each(loaded_endpoints, [&] (auto& e) -> future<> {
+ auto& [host_id, st] = e;
+ if (host_id == my_host_id()) {
+ on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
+ }
+ co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
+ });
}

auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:33 AMJan 5
to scylladb-dev@googlegroups.com
Currently the logic is mirrored shadow_round is true in on shadow round.
Fix it but flipping all the logic.
---
gms/gossiper.cc | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/gms/gossiper.cc b/gms/gossiper.cc
index c19b2db0cda..19747be504f 100644
--- a/gms/gossiper.cc
+++ b/gms/gossiper.cc
@@ -604,7 +604,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
node, local_generation, remote_generation);
} else if (remote_generation > local_generation) {
- logger.trace("Updating heartbeat state generation to {} from {} for {} (notify={})", remote_generation, local_generation, node, shadow_round);
+ logger.trace("Updating heartbeat state generation to {} from {} for {} (notify={})", remote_generation, local_generation, node, !shadow_round);
// major state change will handle the update by inserting the remote state directly
co_await handle_major_state_change(node, std::move(remote_state), permit.id(), shadow_round);
} else if (remote_generation == local_generation) {
@@ -617,21 +617,21 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, endpoint_state
} else {
logger.debug("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
}
- if (!is_alive(node) && !is_dead_state(get_endpoint_state(node)) && shadow_round) { // unless of course, it was dead
+ if (!is_alive(node) && !is_dead_state(get_endpoint_state(node)) && !shadow_round) { // unless of course, it was dead
mark_alive(node);
}
} else {
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
}
} else {
- logger.debug("Applying remote_state for node {} ({} node)", node, shadow_round ? "old" : "new");
+ logger.debug("Applying remote_state for node {} ({} node)", node, !shadow_round ? "old" : "new");
co_await handle_major_state_change(node, std::move(remote_state), permit.id(), shadow_round);
}
}

future<> gossiper::apply_state_locally_in_shadow_round(std::unordered_map<inet_address, endpoint_state> map) {
for (auto& [node, remote_state] : map) {
- co_await do_apply_state_locally(node, std::move(remote_state), false);
+ co_await do_apply_state_locally(node, std::move(remote_state), true);
}
}

@@ -668,7 +668,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
}
}
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, &ep, &map] () mutable {
- return do_apply_state_locally(ep, std::move(map[ep]), true);
+ return do_apply_state_locally(ep, std::move(map[ep]), false);
});
});

@@ -1795,7 +1795,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, endpoint_state eps

endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep);

- if (!is_dead_state(eps) && shadow_round) {
+ if (!is_dead_state(eps) && !shadow_round) {
if (_endpoint_state_map.contains(ep)) {
logger.info("Node {} has restarted, now UP, status = {}", ep, get_gossip_status(eps));
} else {
@@ -1805,7 +1805,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, endpoint_state eps
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
co_await replicate(ep, eps, pid);

- if (!shadow_round) {
+ if (shadow_round) {
co_return;
}

@@ -1915,7 +1915,7 @@ future<> gossiper::apply_new_states(inet_address addr, endpoint_state local_stat
// being replicated to all shards.
co_await replicate(addr, std::move(local_state), pid);

- if (!shadow_round) {
+ if (shadow_round) {
co_return;
}

--
2.47.1

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 4:25:33 AMJan 5
to scylladb-dev@googlegroups.com
Currently it should not happen because gossiper shadow round does
similar check, but we want to drop states that propagate through raft
from the gossiper eventually.
---
service/storage_service.cc | 7 +++++++
1 file changed, 7 insertions(+)

diff --git a/service/storage_service.cc b/service/storage_service.cc
index 78de47f1c82..ea299d16115 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -6783,6 +6783,13 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
co_return result;
}

+ if (replaced_it->second.datacenter != params.datacenter || replaced_it->second.rack != params.rack) {
+ result.result = join_node_request_result::rejected{
+ .reason = fmt::format("Cannot replace node in {}/{} with node in {}/{}", replaced_it->second.datacenter, replaced_it->second.rack, params.datacenter, params.rack),
+ };
+ co_return result;
+ }
+
auto is_zero_token = params.num_tokens == 0 && params.tokens_string.empty();
if (replaced_it->second.ring.value().tokens.empty() && !is_zero_token) {
result.result = join_node_request_result::rejected{
--
2.47.1

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jan 5, 2025, 4:50:51 AMJan 5
to Gleb Natapov, scylladb-dev@googlegroups.com
On Sun, 2025-01-05 at 11:12 +0200, 'Gleb Natapov' via ScyllaDB
development wrote:
> Check that adding an entry with smaller generation does not overwrite
> existing entry.
> ---
> test/boost/address_map_test.cc | 17 +++++++++++++++++
> 1 file changed, 17 insertions(+)
>
> diff --git a/test/boost/address_map_test.cc b/test/boost/address_map_test.cc
> index f8b0fb6da4b..7594a2cd088 100644
> --- a/test/boost/address_map_test.cc
> +++ b/test/boost/address_map_test.cc
> @@ -246,6 +246,23 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_operations) {
> scoped_no_abort_on_internal_error abort_guard;
> BOOST_CHECK_THROW(m.find_by_addr(gms::inet_address{}), std::runtime_error);
> }
> + {
> + // Check that an update with smaller generation will not overwrite update with larger one
> + // but other way around works
> + sharded<address_map_t<manual_clock>> m_svc;
> + m_svc.start().get();
> + auto stop_map = defer([&m_svc] { m_svc.stop().get(); });

Nit: using seastar/util/closeable.hh, this can be simplified to
auto stop_map = deferred_stop(m_svc);

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jan 5, 2025, 4:55:52 AMJan 5
to Gleb Natapov, scylladb-dev@googlegroups.com
On Sun, 2025-01-05 at 11:12 +0200, 'Gleb Natapov' via ScyllaDB
development wrote:
nit: s/were ID to IP maopping/where ID to IP mapping/

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 5:00:12 AMJan 5
to Benny Halevy, scylladb-dev@googlegroups.com
On Sun, Jan 05, 2025 at 11:50:46AM +0200, Benny Halevy wrote:
> On Sun, 2025-01-05 at 11:12 +0200, 'Gleb Natapov' via ScyllaDB
> development wrote:
> > Check that adding an entry with smaller generation does not overwrite
> > existing entry.
> > ---
> > test/boost/address_map_test.cc | 17 +++++++++++++++++
> > 1 file changed, 17 insertions(+)
> >
> > diff --git a/test/boost/address_map_test.cc b/test/boost/address_map_test.cc
> > index f8b0fb6da4b..7594a2cd088 100644
> > --- a/test/boost/address_map_test.cc
> > +++ b/test/boost/address_map_test.cc
> > @@ -246,6 +246,23 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_operations) {
> > scoped_no_abort_on_internal_error abort_guard;
> > BOOST_CHECK_THROW(m.find_by_addr(gms::inet_address{}), std::runtime_error);
> > }
> > + {
> > + // Check that an update with smaller generation will not overwrite update with larger one
> > + // but other way around works
> > + sharded<address_map_t<manual_clock>> m_svc;
> > + m_svc.start().get();
> > + auto stop_map = defer([&m_svc] { m_svc.stop().get(); });
>
> Nit: using seastar/util/closeable.hh, this can be simplified to
> auto stop_map = deferred_stop(m_svc);
>
The test copies existing test in the same file. If it worth it someone
needs to send a cleanup patch to change all of them. I am not sure it
does though since we do not need half of its functionality here.

> > + auto& m = m_svc.local();
> > +
> > + m.add_or_update_entry(id1, addr1, gms::generation_type{2});
> > + m.add_or_update_entry(id1, addr2, gms::generation_type{1});
> > +
> > + BOOST_CHECK(m.find(id1).value() == addr1);
> > +
> > + m.add_or_update_entry(id1, addr2, gms::generation_type{3});
> > +
> > + BOOST_CHECK(m.find(id1).value() == addr2);
> > + }
> > }
> >
> > SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
> > --
> > 2.47.1
> >
>

--
Gleb.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jan 5, 2025, 5:34:10 AMJan 5
to Gleb Natapov, scylladb-dev@googlegroups.com
Why not use modern abstractions in new code even though
there exists old code that predates those abstractions?

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 5:41:03 AMJan 5
to Benny Halevy, scylladb-dev@googlegroups.com
Because the code will look like a mishmash. A poor developer who will
have to look into the code again for some reason will have to figure our
that this test is like all other tests in the function, but just use
some other abstraction du jour.


> > > > + auto& m = m_svc.local();
> > > > +
> > > > + m.add_or_update_entry(id1, addr1, gms::generation_type{2});
> > > > + m.add_or_update_entry(id1, addr2, gms::generation_type{1});
> > > > +
> > > > + BOOST_CHECK(m.find(id1).value() == addr1);
> > > > +
> > > > + m.add_or_update_entry(id1, addr2, gms::generation_type{3});
> > > > +
> > > > + BOOST_CHECK(m.find(id1).value() == addr2);
> > > > + }
> > > > }
> > > >
> > > > SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
> > > > --
> > > > 2.47.1
> > > >
> > >
> >
> > --
> > Gleb.
>

--
Gleb.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jan 5, 2025, 5:45:57 AMJan 5
to Gleb Natapov, scylladb-dev@googlegroups.com
So this is a good opportunity to add a cleanup patch
to modernize the existing code, while at it.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 5:48:58 AMJan 5
to Benny Halevy, scylladb-dev@googlegroups.com
You are welcome to send one if you think it worth it. IU certainly do
see any advantage in this new abstraction here.

> >
> >
> > > > > > + auto& m = m_svc.local();
> > > > > > +
> > > > > > + m.add_or_update_entry(id1, addr1, gms::generation_type{2});
> > > > > > + m.add_or_update_entry(id1, addr2, gms::generation_type{1});
> > > > > > +
> > > > > > + BOOST_CHECK(m.find(id1).value() == addr1);
> > > > > > +
> > > > > > + m.add_or_update_entry(id1, addr2, gms::generation_type{3});
> > > > > > +
> > > > > > + BOOST_CHECK(m.find(id1).value() == addr2);
> > > > > > + }
> > > > > > }
> > > > > >
> > > > > > SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
> > > > > > --
> > > > > > 2.47.1
> > > > > >
> > > > >
> > > >
> > > > --
> > > > Gleb.
> > >
> >
> > --
> > Gleb.
>

--
Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 5, 2025, 6:51:12 AMJan 5
to Benny Halevy, scylladb-dev@googlegroups.com
On Sun, Jan 05, 2025 at 11:55:47AM +0200, Benny Halevy wrote:
> > - co_await replicate_to_all_cores(std::move(tmptr));
> > + } else {
> > + slogger.info("Loading persisted peers into the gossiper");
> > + // If topology coordinator is enabled only load peers into the gossiper (since it is were ID to IP maopping is managed)
>
> nit: s/were ID to IP maopping/where ID to IP mapping/
>
Forced pushed a version with fixed typo.

> > + // No need to update topology.
> > + co_await coroutine::parallel_for_each(loaded_endpoints, [&] (auto& e) -> future<> {
> > + auto& [host_id, st] = e;
> > + if (host_id == my_host_id()) {
> > + on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
> > + }
> > + co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
> > + });
> > }
> >
> > auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
> > --
> > 2.47.1
> >
>

--
Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 8, 2025, 10:04:43 AMJan 8
to scylladb-dev@googlegroups.com, kbraun@scylladb.com, avi@scylladb.com
Ping.
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 9, 2025, 7:58:46 AMJan 9
to Gleb Natapov, scylladb-dev@googlegroups.com
Using value() results in undebuggable exceptions, as we have both
experienced.

> info.tokens = std::move(owned_tokens);
> co_await _sys_ks.local().update_peer_info(endpoint, host_id, info);
> } catch (...) {
> @@ -2761,17 +2763,16 @@ future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_s
> return make_ready_future();
> }
>
> -db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint) {
> +std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint) {
> auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
> if (!ep_state) {
> return db::system_keyspace::peer_info{};
> }
> auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
> - if (!info) {
> + if (!info && !raft_topology_change_enabled()) {
Why is it only an error outside raft topology mode?

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 9, 2025, 8:12:51 AMJan 9
to Kamil Braun, scylladb-dev@googlegroups.com
In this particular case this cannot happen since otherwise we will have
on_internal_error in get_peer_info_for_update itself.

> > info.tokens = std::move(owned_tokens);
> > co_await _sys_ks.local().update_peer_info(endpoint, host_id, info);
> > } catch (...) {
> > @@ -2761,17 +2763,16 @@ future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_s
> > return make_ready_future();
> > }
> > -db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint) {
> > +std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint) {
> > auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
> > if (!ep_state) {
> > return db::system_keyspace::peer_info{};
> > }
> > auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
> > - if (!info) {
> > + if (!info && !raft_topology_change_enabled()) {
> Why is it only an error outside raft topology mode?

Because this is not legal in gossiper topology, but may be legal in raft
topology because we want to stop distributing states we distribute
through raft in gossiper, so the function may not find anything to
update here and this is fine.
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 9, 2025, 8:18:18 AMJan 9
to Gleb Natapov, scylladb-dev@googlegroups.com
Weren't all usages of value() cases where it "cannot happen"?
>>
>
> In this particular case this cannot happen since otherwise we will have
> on_internal_error in get_peer_info_for_update itself.
>
>>> info.tokens = std::move(owned_tokens);
>>> co_await _sys_ks.local().update_peer_info(endpoint, host_id, info);
>>> } catch (...) {
>>> @@ -2761,17 +2763,16 @@ future<> storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_s
>>> return make_ready_future();
>>> }
>>> -db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint) {
>>> +std::optional<db::system_keyspace::peer_info> storage_service::get_peer_info_for_update(inet_address endpoint) {
>>> auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
>>> if (!ep_state) {
>>> return db::system_keyspace::peer_info{};
>>> }
>>> auto info = get_peer_info_for_update(endpoint, ep_state->get_application_state_map());
>>> - if (!info) {
>>> + if (!info && !raft_topology_change_enabled()) {
>> Why is it only an error outside raft topology mode?
>
> Because this is not legal in gossiper topology, but may be legal in raft
> topology because we want to stop distributing states we distribute
> through raft in gossiper, so the function may not find anything to
> update here and this is fine.
So at this point in time, we don't need this change, but it might be
needed in the future if/when decide to stop distributing these states.

Why do the change now?

I wouldn't even be sure we actually stop distributing these states.
Gossiper states are accessible through user APIs. Support and cluster
admins and external tools are using them to investigate cluster state.
Removing them would a backwards incompatible change.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 9, 2025, 8:28:23 AMJan 9
to Kamil Braun, scylladb-dev@googlegroups.com
I tries to do that in this series, but it is not possible right
now since replace depends on the info. It should not though and the last
patch in this series moves us to this direction.

> Why do the change now?
>
Since I know now that the function's assumptions can be broken it is
better to make it more robust now instead of debugging it again in the
future.

> I wouldn't even be sure we actually stop distributing these states. Gossiper
> states are accessible through user APIs. Support and cluster admins and
> external tools are using them to investigate cluster state. Removing them
> would a backwards incompatible change.

APIs responses can be synthesized from the multiple sources for
backwards compatibility. But we also need to evolve out tools to use
primary sources of the information.


--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 9, 2025, 8:35:26 AMJan 9
to Gleb Natapov, scylladb-dev@googlegroups.com
LGTM, but next is frozen so won't queue

On 1/5/25 10:12 AM, 'Gleb Natapov' via ScyllaDB development wrote:

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 9, 2025, 8:36:48 AMJan 9
to Kamil Braun, scylladb-dev@googlegroups.com
On Thu, Jan 09, 2025 at 02:35:20PM +0100, Kamil Braun wrote:
> LGTM, but next is frozen so won't queue
>
For the reason of CI flakiness or before release?
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jan 9, 2025, 8:37:14 AMJan 9
to Gleb Natapov, scylladb-dev@googlegroups.com
CI
Reply all
Reply to author
Forward
0 new messages