From: Kamil Braun <
kbr...@scylladb.com>
Committer: Kamil Braun <
kbr...@scylladb.com>
Branch: next
Merge 'rollback topology operation on streaming failure' from Gleb
This patch series adds error handling for streaming failure during
topology operations instead of an infinite retry. If streaming fails the
operation is rolled back: bootstrap/replace nodes move to left and
decommissioned/remove nodes move back to normal state.
* 'gleb/streaming-failure-rollback-v4' of github.com:scylladb/scylla-dev:
raft: make sure that all operation forwarded to a leader are completed before destroying raft server
storage_service: raft topology: remove code duplication from global_tablet_token_metadata_barrier
tests: add tests for streaming failure in bootstrap/replace/remove/decomission
test/pylib: do not stop node if decommission failed with an expected error
storage_service: raft topology: fix typo in "decommission" everywhere
storage_service: raft topology: add streaming error injection
storage_service: raft topology: do not increase topology version during CDC repair
storage_service: raft topology: rollback topology operation on streaming failure.
storage_service: raft topology: load request parameters in left_token_ring state as well
storage_service: raft topology: do not report term_changed_error during global_token_metadata_barrier as an error
storage_service: raft topology: change global_token_metadata_barrier error handling to try/catch
storage_service: raft topology: make global_token_metadata_barrier node independent
storage_service: raft topology: split get_excluded_nodes from exec_global_command
storage_service: raft topology: drop unused include_local and do_retake parameters from exec_global_command which are always true
storage_service: raft topology: simplify streaming RPC failure handling
---
diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc
--- a/db/system_keyspace.cc
+++ b/db/system_keyspace.cc
@@ -2537,6 +2537,17 @@ future<service::topology> system_keyspace::load_topology_state() {
}
ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option});
break;
+ case service::node_state::left_token_ring:
+ // If replacenode fails the bootstraping node is moved to left_token_ring state where it executes the metadata
+ // barrier. It needs to know which nodes to ignore during the barrier, so put them here into the replace_param.
+ // Note that if the replacenode does not fail and later the node is decommissioned it will move to the left_token_ring
+ // state at some point and replace_param will be created here as well (we do not remove replaced_id, and ignored_ids
+ // when we move to normal state). But this is OK because we allow to ignore nodes during topology operations only if they
+ // are permanently dead.
+ if (replaced_id) {
+ ret.req_param.emplace(host_id, service::replace_param{*replaced_id, std::move(ignored_ids)});
+ }
+ break;
default:
// no parameters for other operations
break;
diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md
--- a/docs/dev/topology-over-raft.md
+++ b/docs/dev/topology-over-raft.md
@@ -6,7 +6,7 @@ their state, properties (topology, tokens, etc) and requested actions.
Node state can be one of those:
- `none` - the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
- `bootstrapping` - the node is currently in the process of streaming its part of the ring
-- `decommissioning` - the node is being decomissioned and stream its data to nodes that took over
+- `decommissioning` - the node is being decommissioned and stream its data to nodes that took over
- `removing` - the node is being removed and its data is streamed to nodes that took over from still alive owners
- `replacing` - the node replaces another dead node in the cluster and it data is being streamed to it
- `rebuilding` - the node is being rebuild and is streaming data from other replicas
diff --git a/raft/server.cc b/raft/server.cc
--- a/raft/server.cc
+++ b/raft/server.cc
@@ -22,6 +22,7 @@
#include <seastar/core/metrics.hh>
#include <seastar/rpc/rpc_types.hh>
#include <absl/container/flat_hash_map.h>
+#include <seastar/core/gate.hh>
#include "fsm.hh"
#include "log.hh"
@@ -288,6 +289,8 @@ class server_impl : public rpc_server, public server {
std::optional<shared_promise<>> _tick_promise;
future<> wait_for_next_tick(seastar::abort_source* as);
+
+ seastar::gate _do_on_leader_gate;
// Call a function on a current leader until it returns stop_iteration::yes.
// Handles aborts and leader changes, adds a delay between
// iterations to protect against tight loops.
@@ -595,6 +598,9 @@ requires requires (server_id& leader, AsyncAction aa) {
future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action) {
server_id leader = _fsm->current_leader(), prev_leader{};
+ check_not_aborted();
+ auto gh = _do_on_leader_gate.hold();
+
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
@@ -1287,6 +1293,9 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
if (as && as->abort_requested()) {
throw request_aborted();
}
+
+ check_not_aborted();
+
if (idx > _applied_idx) {
// The index is not applied yet. Wait for it.
// This will be signalled when read_idx is applied
@@ -1481,7 +1490,11 @@ future<> server_impl::abort(sstring reason) {
auto all_futures = boost::range::join(snp_futures, append_futures);
- co_await seastar::when_all_succeed(all_futures.begin(), all_futures.end()).discard_result();
+ std::array<future<>, 1> gate{_do_on_leader_gate.close()};
+
+ auto all_with_gate = boost::range::join(all_futures, gate);
+
+ co_await seastar::when_all_succeed(all_with_gate.begin(), all_with_gate.end()).discard_result();
}
future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_source* as) {
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -864,6 +864,9 @@ class topology_coordinator {
using drop_guard_and_retake = bool_class<class retake_guard_tag>;
+ // True if an ongoing topology change should be rolled back
+ bool _rollback = false;
+
const locator::token_metadata& get_token_metadata() const noexcept {
return *_shared_tm.get();
}
@@ -1008,18 +1011,25 @@ class topology_coordinator {
};
raft::server_id parse_replaced_node(const node_to_work_on& node) {
- if (node.rs->state == node_state::replacing) {
- return std::get<replace_param>(node.req_param.value()).replaced_id;
+ if (node.req_param) {
+ auto *param = std::get_if<replace_param>(&*node.req_param);
+ if (param) {
+ return param->replaced_id;
+ }
}
return {};
}
std::unordered_set<raft::server_id> parse_ignore_nodes(const node_to_work_on& node) {
- if (node.rs->state == node_state::removing) {
- return std::get<removenode_param>(node.req_param.value()).ignored_ids;
- }
- if (node.rs->state == node_state::replacing) {
- return std::get<replace_param>(node.req_param.value()).ignored_ids;
+ if (node.req_param) {
+ auto* remove_param = std::get_if<removenode_param>(&*node.req_param);
+ if (remove_param) {
+ return remove_param->ignored_ids;
+ }
+ auto* rep_param = std::get_if<replace_param>(&*node.req_param);
+ if (rep_param) {
+ return rep_param->ignored_ids;
+ }
}
return {};
}
@@ -1094,15 +1104,14 @@ class topology_coordinator {
co_return guard;
}
- future<node_to_work_on> exec_global_command(
- node_to_work_on&& node, const raft_topology_cmd& cmd, bool include_local,
- drop_guard_and_retake do_retake = drop_guard_and_retake::yes) {
- std::unordered_set<raft::server_id> exclude_nodes = parse_ignore_nodes(node);
+ std::unordered_set<raft::server_id> get_excluded_nodes(const node_to_work_on& node) {
+ auto exclude_nodes = parse_ignore_nodes(node);
exclude_nodes.insert(parse_replaced_node(node));
- if (!include_local) {
- exclude_nodes.insert(_
raft.id());
- }
- auto guard = co_await exec_global_command(std::move(node.guard), cmd, exclude_nodes, do_retake);
+ return exclude_nodes;
+ }
+
+ future<node_to_work_on> exec_global_command(node_to_work_on&& node, const raft_topology_cmd& cmd) {
+ auto guard = co_await exec_global_command(std::move(node.guard), cmd, get_excluded_nodes(node), drop_guard_and_retake::yes);
co_return retake_node(std::move(guard),
node.id);
};
@@ -1451,23 +1460,15 @@ class topology_coordinator {
slogger.info("raft topology: enabled features: {}", features_to_enable);
}
- future<node_to_work_on> global_token_metadata_barrier(node_to_work_on&& node) {
- node = co_await exec_global_command(std::move(node),
- raft_topology_cmd::command::barrier_and_drain,
- true);
- node = co_await exec_global_command(std::move(node),
- raft_topology_cmd::command::fence,
- true);
- co_return std::move(node);
+ future<group0_guard> global_token_metadata_barrier(group0_guard&& guard, std::unordered_set<raft::server_id> exclude_nodes = {}) {
+ guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes);
+ guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::fence, exclude_nodes, drop_guard_and_retake::yes);
+ co_return std::move(guard);
}
future<group0_guard> global_tablet_token_metadata_barrier(group0_guard guard) {
// FIXME: Don't require all nodes to be up, only tablet replicas.
- guard = co_await exec_global_command(std::move(guard),
- raft_topology_cmd { raft_topology_cmd::command::barrier_and_drain }, {});
- guard = co_await exec_global_command(std::move(guard),
- raft_topology_cmd { raft_topology_cmd::command::fence }, {});
- co_return std::move(guard);
+ return global_token_metadata_barrier(std::move(guard));
}
// Represents a two-state state machine which changes monotonically
@@ -1784,7 +1785,7 @@ class topology_coordinator {
co_await enable_features(std::move(guard), std::move(feats));
co_return true;
}
-
+
// If there is no other work, evaluate load and start tablet migration if there is imbalance.
if (co_await maybe_start_tablet_migration(std::move(guard))) {
co_return true;
@@ -1916,13 +1917,13 @@ class topology_coordinator {
// majority and commit.
topology_mutation_builder builder(guard.write_timestamp());
builder.set_current_cdc_generation_id(cdc_gen_id)
- .add_unpublished_cdc_generation(cdc_gen_id)
- .set_version(_topo_sm._topology.version + 1);
+ .add_unpublished_cdc_generation(cdc_gen_id);
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
builder.del_global_topology_request();
builder.del_transition_state();
} else {
builder.set_transition_state(topology::transition_state::write_both_read_old);
+ builder.set_version(_topo_sm._topology.version + 1);
}
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
@@ -1935,15 +1936,16 @@ class topology_coordinator {
auto node = get_node_to_work_on(std::move(guard));
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
- {
- auto f = co_await coroutine::as_future(global_token_metadata_barrier(std::move(node)));
- if (f.failed()) {
- slogger.error("raft topology: transition_state::write_both_read_old, "
- "global_token_metadata_barrier failed, error {}",
- f.get_exception());
- break;
- }
- node = std::move(f).get();
+ try {
+ node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)),
node.id);
+ } catch (term_changed_error&) {
+ throw;
+ } catch (...) {
+ slogger.error("raft topology: transition_state::write_both_read_old, "
+ "global_token_metadata_barrier failed, error {}",
+ std::current_exception());
+ _rollback = true;
+ break;
}
if (_group0.is_member(
node.id, true)) {
@@ -1983,26 +1985,21 @@ class topology_coordinator {
}
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
- if (node.rs->state == node_state::removing) {
- // tell all nodes to stream data of the removed node to new range owners
- auto f = co_await coroutine::as_future(exec_global_command(std::move(node), cmd, true));
- if (f.failed()) {
- slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed "
- "during removenode, error {}", f.get_exception());
- break;
- }
- node = std::move(f).get();
- } else {
- // Tell joining/leaving/replacing node to stream its ranges
- try {
+ try {
+ if (node.rs->state == node_state::removing) {
+ // tell all nodes to stream data of the removed node to new range owners
+ node = co_await exec_global_command(std::move(node), cmd);
+ } else {
+ // Tell joining/leaving/replacing node to stream its ranges
node = co_await exec_direct_command(std::move(node), cmd);
- } catch (term_changed_error&) {
- throw;
- } catch (...) {
- slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed with exception"
- " (node state is {}): {}", node.rs->state, std::current_exception());
- break;
}
+ } catch (term_changed_error&) {
+ throw;
+ } catch (...) {
+ slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed with exception"
+ " (node state is {}): {}", node.rs->state, std::current_exception());
+ _rollback = true;
+ break;
}
// Streaming completed. We can now move tokens state to topology::transition_state::write_both_read_new
topology_mutation_builder builder(node.guard.write_timestamp());
@@ -2018,15 +2015,15 @@ class topology_coordinator {
// In this state writes goes to old and new replicas but reads start to be done from new replicas
// Before we stop writing to old replicas we need to wait for all previous reads to complete
- {
- auto f = co_await coroutine::as_future(global_token_metadata_barrier(std::move(node)));
- if (f.failed()) {
- slogger.error("raft topology: transition_state::write_both_read_new, "
- "global_token_metadata_barrier failed, error {}",
- f.get_exception());
- break;
- }
- node = std::move(f).get();
+ try {
+ node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)),
node.id);
+ } catch (term_changed_error&) {
+ throw;
+ } catch (...) {
+ slogger.error("raft topology: transition_state::write_both_read_new, "
+ "global_token_metadata_barrier failed, error {}",
+ std::current_exception());
+ break;
}
switch(node.rs->state) {
case node_state::bootstrapping: {
@@ -2221,16 +2218,15 @@ class topology_coordinator {
}
// Wait until other nodes observe the new token ring and stop sending writes to this node.
- {
- auto id =
node.id;
- auto f = co_await coroutine::as_future(global_token_metadata_barrier(std::move(node)));
- if (f.failed()) {
- slogger.error("raft topology: node_state::left_token_ring (node: {}), "
- "global_token_metadata_barrier failed, error {}",
- id, f.get_exception());
- break;
- }
- node = std::move(f).get();
+ try {
+ node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)),
node.id);
+ } catch (term_changed_error&) {
+ throw;
+ } catch (...) {
+ slogger.error("raft topology: node_state::left_token_ring (node: {}), "
+ "global_token_metadata_barrier failed, error {}",
+
node.id, std::current_exception());
+ break;
}
// Tell the node to shut down.
@@ -2345,6 +2341,7 @@ class topology_coordinator {
}
future<> fence_previous_coordinator() noexcept;
+ future<> rollback_current_topology_op(group0_guard&& guard);
public:
topology_coordinator(
@@ -2420,6 +2417,57 @@ future<> topology_coordinator::fence_previous_coordinator() noexcept {
}
}
+future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard) {
+
slogger.info("raft topology: start rolling back topology change");
+ // Look for a node which operation should be aborted
+ // (there should be one since we are in the rollback)
+ node_to_work_on node = get_node_to_work_on(std::move(guard));
+ node_state state;
+ std::unordered_set<raft::server_id> exclude_nodes = parse_ignore_nodes(node);
+
+ switch (node.rs->state) {
+ case node_state::bootstrapping:
+ [[fallthrough]];
+ case node_state::replacing:
+ // To rollback bootstrap of replace just move a node that we tried to add to the left_token_ring state.
+ // It will be removed from the group0 by the state handler. It will also be notified to shutdown.
+ state = node_state::left_token_ring;
+ break;
+ case node_state::removing:
+ // Exclude dead node from global barrier
+ exclude_nodes.emplace(
node.id);
+ // The node was removed already. We need to add it back. Lets do it as non voter.
+ // If it ever boots again it will make itself a voter.
+ co_await _group0.group0_server().modify_config({raft::config_member{{
node.id, {}}, false}}, {}, &_as);
+ [[fallthrough]];
+ case node_state::decommissioning:
+ // to rollback decommission or remove just move a node that we tried to remove back to normal state
+ state = node_state::normal;
+ break;
+ default:
+ on_internal_error(slogger, fmt::format("raft topology: tried to rollback in unsupported state {}", node.rs->state));
+ }
+
+ topology_mutation_builder builder(node.guard.write_timestamp());
+ builder.del_transition_state()
+ .set_version(_topo_sm._topology.version + 1)
+ .with_node(
node.id)
+ .set("node_state", state);
+
+ auto str = fmt::format("rollback {} after {} failure to state {}",
node.id, node.rs->state, state);
+
+ co_await update_topology_state(std::move(node.guard), {builder.build()}, str);
+
slogger.info(str.c_str());
+ // Try to run metadata barrier to wait for all double writes to complete
+ // but ignore failures
+ try {
+ co_await global_token_metadata_barrier(co_await start_operation(), std::move(exclude_nodes));
+ } catch (term_changed_error&) {
+ } catch(...) {
+ slogger.warn("raft topology: failed to run metadata barrier during rollback {}", std::current_exception());
+ }
+}
+
future<> topology_coordinator::run() noexcept {
slogger.info("raft topology: start topology coordinator fiber");
@@ -2436,6 +2484,12 @@ future<> topology_coordinator::run() noexcept {
auto guard = co_await start_operation();
co_await cleanup_group0_config_if_needed();
+ if (_rollback) {
+ co_await rollback_current_topology_op(std::move(guard));
+ _rollback = false;
+ continue;
+ }
+
bool had_work = co_await handle_topology_transition(std::move(guard));
if (!had_work) {
// Nothing to work on. Wait for topology change event.
@@ -3018,14 +3072,23 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// we do that here.
co_await raft_initialize_discovery_leader(*raft_server, join_params);
+ auto leaving = [&] {
+ return _topology_state_machine._topology.left_nodes.contains(raft_server->id()) ||
+ (_topology_state_machine._topology.transition_nodes.contains(raft_server->id()) &&
+ _topology_state_machine._topology.transition_nodes[raft_server->id()].state == node_state::left_token_ring);
+ };
+
// Wait until we enter one of the final states
- co_await _topology_state_machine.event.when([this, raft_server] {
- return _topology_state_machine._topology.normal_nodes.contains(raft_server->id()) ||
- _topology_state_machine._topology.left_nodes.contains(raft_server->id());
+ co_await _topology_state_machine.event.when([this, raft_server, &leaving] {
+ return _topology_state_machine._topology.normal_nodes.contains(raft_server->id()) || leaving();
});
- if (_topology_state_machine._topology.left_nodes.contains(raft_server->id())) {
- throw std::runtime_error("A node that already left the cluster cannot be restarted");
+ if (leaving()) {
+ if (_sys_ks.local().bootstrap_complete()) {
+ throw std::runtime_error("A node that already left the cluster cannot be restarted");
+ } else {
+ throw std::runtime_error(fmt::format("{} failed. See earlier errors", raft_replace_info ? "Replace" : "Bootstrap"));
+ }
}
co_await update_topology_with_local_metadata(*raft_server);
@@ -4416,10 +4479,9 @@ void on_streaming_finished() {
utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get();
}
-future<> storage_service::raft_decomission() {
+future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
- auto shutdown_request_future = make_ready_future<>();
auto disengage_shutdown_promise = defer([this] {
_shutdown_request_promise = std::nullopt;
});
@@ -4439,40 +4501,68 @@ future<> storage_service::raft_decomission() {
}
if (_topology_state_machine._topology.normal_nodes.size() == 1) {
- throw std::runtime_error("Cannot decomission last node in the cluster");
+ throw std::runtime_error("Cannot decommission last node in the cluster");
}
- shutdown_request_future = _shutdown_request_promise.emplace().get_future();
-
-
slogger.info("raft topology: request decomission for: {}",
raft_server.id());
+
slogger.info("raft topology: request decommission for: {}",
raft_server.id());
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(
raft_server.id())
.set("topology_request", topology_request::leave);
topology_change change{{builder.build()}};
- group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decomission: request decomission for {}",
raft_server.id()));
+ group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}",
raft_server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
} catch (group0_concurrent_modification&) {
-
slogger.info("raft topology: decomission: concurrent operation is detected, retrying.");
+
slogger.info("raft topology: decommission: concurrent operation is detected, retrying.");
continue;
}
break;
}
- // Wait for the coordinator to tell us to shut down.
- co_await std::move(shutdown_request_future);
+ // Wait for the coordinator to tell us to shut down or for decomission request to disappear
+ bool abort_wait = false;
+
+ auto f1 = _shutdown_request_promise.emplace().get_future().then([this, &abort_wait] {
+ // shutdown was signalled, abort the wait for the topology event
+ abort_wait = true;
+ _topology_state_machine.event.broadcast();
+ });
+
+ auto f2 = _topology_state_machine.event.wait([this, &raft_server, &abort_wait] {
+ if (abort_wait) {
+ return true; // the wait is aborted
+ }
+ // Wait for decommission request to be removed, but node stay as normal which means decommission failed
+ auto it = _topology_state_machine._topology.find(
raft_server.id());
+ if (it->second.state == node_state::normal) {
+ auto rit = _topology_state_machine._topology.requests.find(
raft_server.id());
+ if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::leave) {
+ _shutdown_request_promise->set_exception(std::runtime_error("Decommission failure"));
+ return true; // node is normal, but leave request is gone. It means decommission failed
+ }
+ }
+ return false;
+ });
+
+ auto res = co_await when_all(std::move(f1), std::move(f2));
- // Need to set it otherwise gossiper will try to send shutdown on exit
- co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }});
+ if (!std::get<0>(res).failed()) {
+ // Need to set it otherwise gossiper will try to send shutdown on exit
+ co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }});
+ } else {
+ const auto err = "Decommission failed. See earlier errors";
+ slogger.error(err);
+ throw std::runtime_error(err);
+ }
}
future<> storage_service::decommission() {
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
return seastar::async([&ss] {
std::exception_ptr leave_group0_ex;
if (ss._raft_topology_change_enabled) {
- ss.raft_decomission().get();
+ ss.raft_decommission().get();
} else {
bool left_token_ring = false;
auto uuid = node_ops_id::create_random_id();
@@ -4796,15 +4886,34 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
break;
}
- // Wait the node we are removing to enter left state
- co_await _topology_state_machine.event.when([this, id] {
- return _topology_state_machine._topology.left_nodes.contains(id);
+ bool left = false;
+ co_await _topology_state_machine.event.when([this, id, &left] {
+ // Wait for this node to move to state left which means that removenode completed
+ // or wait for removenode request to be removed, but node stay as normal which means removenode failed
+ auto it = _topology_state_machine._topology.find(id);
+ if (!it) {
+ left = true;
+ return true; // node either left or on the way
+ }
+ if (it->second.state == node_state::normal) {
+ auto rit = _topology_state_machine._topology.requests.find(id);
+ if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::remove) {
+ return true; // node is normal, but remove request is gone. It means removenode failed
+ }
+ }
+ return false;
});
- try {
- co_await _group0->remove_from_raft_config(id);
- } catch (raft::not_a_member&) {
-
slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator");
+ if (left) {
+ try {
+ co_await _group0->remove_from_raft_config(id);
+ } catch (raft::not_a_member&) {
+
slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator");
+ }
+ } else {
+ const auto err = "Removenode failed. See earlier errors";
+ slogger.error(err);
+ throw std::runtime_error(err);
}
}
@@ -5966,6 +6075,10 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
slogger.warn("raft topology: got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state);
break;
}
+
+ utils::get_local_injector().inject("stream_ranges_fail",
+ [] { throw std::runtime_error("stream_range failed due to error injection"); });
+
switch(rs.state) {
case node_state::bootstrapping:
case node_state::replacing: {
@@ -6025,7 +6138,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
}
break;
case node_state::decommissioning:
- co_await retrier(_decomission_result, coroutine::lambda([&] () { return unbootstrap(); }));
+ co_await retrier(_decommission_result, coroutine::lambda([&] () { return unbootstrap(); }));
result.status = raft_topology_cmd_result::command_status::success;
break;
case node_state::normal: {
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -775,7 +775,7 @@ private:
// Those futures hold results of streaming for various operations
std::optional<shared_future<>> _bootstrap_result;
- std::optional<shared_future<>> _decomission_result;
+ std::optional<shared_future<>> _decommission_result;
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
tablet_op_registry _tablet_ops;
@@ -792,7 +792,7 @@ private:
future<raft_topology_cmd_result> raft_topology_cmd_handler(sharded<db::system_distributed_keyspace>& sys_dist_ks, raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd);
future<> raft_initialize_discovery_leader(raft::server&, const join_node_request_params& params);
- future<> raft_decomission();
+ future<> raft_decommission();
future<> raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params);
future<> raft_rebuild(sstring source_dc);
future<> raft_check_and_repair_cdc_streams();
diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh
--- a/service/topology_state_machine.hh
+++ b/service/topology_state_machine.hh
@@ -29,7 +29,7 @@ namespace service {
enum class node_state: uint16_t {
none, // the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
bootstrapping, // the node is currently in the process of streaming its part of the ring
- decommissioning, // the node is being decomissioned and stream its data to nodes that took over
+ decommissioning, // the node is being decommissioned and stream its data to nodes that took over
removing, // the node is being removed and its data is streamed to nodes that took over from still alive owners
replacing, // the node replaces another dead node in the cluster and it data is being streamed to it
rebuilding, // the node is being rebuild and is streaming data from other replicas
diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py
--- a/test/pylib/scylla_cluster.py
+++ b/test/pylib/scylla_cluster.py
@@ -1230,6 +1230,8 @@ async def _cluster_decommission_node(self, request) -> None:
raise RuntimeError(
f"decommission failed (server: {server}) but did not contain expected error"
f"(\"{expected_error}\", check log file at {server.log_filename}, error: \"{exc}\"")
+ else:
+ return
else:
raise RuntimeError(
f"decommission failed (server: {server}), check log at {server.log_filename},"
diff --git a/test/topology/suite.yaml b/test/topology/suite.yaml
--- a/test/topology/suite.yaml
+++ b/test/topology/suite.yaml
@@ -12,5 +12,6 @@ run_first:
- test_tablets
skip_in_release:
- test_cluster_features
+ - test_topology_failure_recovery
run_in_release:
- test_gossiper
diff --git a/test/topology/test_topology_failure_recovery.py b/test/topology/test_topology_failure_recovery.py
--- a/test/topology/test_topology_failure_recovery.py
+++ b/test/topology/test_topology_failure_recovery.py
@@ -0,0 +1,58 @@
+#
+# Copyright (C) 2023-present ScyllaDB
+#
+# SPDX-License-Identifier: AGPL-3.0-or-later
+#
+from test.pylib.manager_client import ManagerClient
+from test.pylib.internal_types import ServerInfo
+from test.pylib.scylla_cluster import ReplaceConfig
+import pytest
+import logging
+
+logger = logging.getLogger(__name__)
+
+...@pytest.mark.asyncio
+async def test_topology_streaming_failure(request, manager: ManagerClient):
+ """Fail streaming while doing a topology operation"""
+ # decommission failure
+ servers = await manager.running_servers()
+ await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True)
+ await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors")
+ servers = await manager.running_servers()
+ assert len(servers) == 3
+ logs = [await manager.server_open_log(srv.server_id) for srv in servers]
+ matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state normal") for log in logs]
+ assert sum(len(x) for x in matches) == 1
+ # remove failure
+ await manager.server_add()
+ servers = await manager.running_servers()
+ await manager.server_stop_gracefully(servers[3].server_id)
+ await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True)
+ await manager.remove_node(servers[0].server_id, servers[3].server_id, expected_error="Removenode failed. See earlier errors")
+ logs = [await manager.server_open_log(srv.server_id) for srv in servers]
+ matches = [await log.grep("storage_service - rollback.*after removing failure to state normal") for log in logs]
+ assert sum(len(x) for x in matches) == 1
+ await manager.server_start(servers[3].server_id)
+ # bootstrap failure
+ servers = await manager.running_servers()
+ s = await manager.server_add(start=False, config={
+ 'error_injections_at_startup': ['stream_ranges_fail']
+ })
+ await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
+ servers = await manager.running_servers()
+ assert s not in servers
+ logs = [await manager.server_open_log(srv.server_id) for srv in servers]
+ matches = [await log.grep("storage_service - rollback.*after bootstrapping failure to state left_token_ring") for log in logs]
+ assert sum(len(x) for x in matches) == 1
+ # replace failure
+ await manager.server_stop_gracefully(servers[2].server_id)
+ replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True)
+ s = await manager.server_add(start=False, replace_cfg=replace_cfg, config={
+ 'error_injections_at_startup': ['stream_ranges_fail']
+ })
+ await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors")
+ servers = await manager.running_servers()
+ assert s not in servers
+ logs = [await manager.server_open_log(srv.server_id) for srv in servers]
+ matches = [await log.grep("storage_service - rollback.*after replacing failure to state left_token_ring") for log in logs]
+ assert sum(len(x) for x in matches) == 1