Wait for all outstanding async work that uses group0 to complete before
destroying group0 server.
Fixes scylladb/scylladb#20701
---
service/raft/raft_group0.hh | 5 +++++
service/storage_service.hh | 4 ++--
service/raft/raft_group0.cc | 5 ++++-
service/storage_service.cc | 33 ++++++++++++++++++++-------------
service/topology_coordinator.cc | 3 +++
5 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
index 5e5de704849..38087e88364 100644
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -291,6 +291,11 @@ class raft_group0 {
return _raft_gr.group0_with_timeouts();
}
+ // Hold shutdown gate to be waited during shutdown
+ gate::holder hold_group0_gate() {
+ return _shutdown_gate.hold();
+ }
+
// Returns true after the group 0 server has been started.
bool joined_group0() const;
diff --git a/service/storage_service.hh b/service/storage_service.hh
index 2e237a3e1b5..53f352b9976 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -835,7 +835,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
future<> _raft_state_monitor = make_ready_future<>();
// This fibers monitors raft state and start/stops the topology change
// coordinator fiber
- future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
+ future<> raft_state_monitor_fiber(raft::server&, gate::holder, sharded<db::system_distributed_keyspace>& sys_dist_ks);
public:
bool topology_global_queue_empty() const {
@@ -976,7 +976,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
semaphore _join_node_response_handler_mutex{1};
future<> _sstable_cleanup_fiber = make_ready_future<>();
- future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
+ future<> sstable_cleanup_fiber(raft::server& raft, gate::holder, sharded<service::storage_proxy>& proxy) noexcept;
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
abort_source _group0_as;
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
index 58581f0169f..acdcd3aabd1 100644
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -382,9 +382,11 @@ future<> raft_group0::abort() {
co_await smp::invoke_on_all([this]() {
return uninit_rpc_verbs(_ms.local());
});
- co_await _shutdown_gate.close();
_leadership_monitor_as.request_abort();
+
+ co_await _shutdown_gate.close();
+
co_await std::move(_leadership_monitor);
co_await stop_group0();
@@ -429,6 +431,7 @@ future<> raft_group0::leadership_monitor_fiber() {
}
});
+ auto holder = hold_group0_gate();
while (true) {
while (!group0_server().is_leader()) {
co_await group0_server().wait_for_state_change(&_leadership_monitor_as);
diff --git a/service/storage_service.cc b/service/storage_service.cc
index dd5ae6db5ad..ed486bcd782 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1032,7 +1032,7 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
// }}} raft_ip_address_updater
-future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
+future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;
try {
@@ -1134,7 +1134,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
}
}
-future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
+future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
std::optional<abort_source> as;
try {
@@ -1867,9 +1867,9 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
co_await raft_initialize_discovery_leader(join_params);
// start topology coordinator fiber
- _raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
+ _raft_state_monitor = raft_state_monitor_fiber(*raft_server, _group0->hold_group0_gate(), sys_dist_ks);
// start cleanup fiber
- _sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy);
+ _sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, _group0->hold_group0_gate(), proxy);
// Need to start system_distributed_keyspace before bootstrap because bootstrapping
// process may access those tables.
@@ -2150,7 +2150,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
// Start the topology coordinator monitor fiber. If we are the leader, this will start
// the topology coordinator which is responsible for driving the upgrade process.
try {
- _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), sys_dist_ks);
+ _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate(), sys_dist_ks);
} catch (...) {
// The calls above can theoretically fail due to coroutine frame allocation failure.
// Abort in this case as the node should be in a pretty bad shape anyway.
@@ -2176,7 +2176,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
}
try {
- _sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), proxy);
+ _sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy);
start_tablet_split_monitor();
} catch (...) {
rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception());
@@ -3649,6 +3649,7 @@ static size_t count_normal_token_owners(const topology& topology) {
future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
+ auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
@@ -4645,6 +4646,7 @@ future<> storage_service::do_drain() {
future<> storage_service::do_cluster_cleanup() {
auto& raft_server = _group0->group0_server();
+ auto holder = _group0->hold_group0_gate();
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
@@ -4715,6 +4717,7 @@ future<> storage_service::wait_for_topology_not_busy() {
future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
auto& raft_server = _group0->group0_server();
+ auto holder = _group0->hold_group0_gate();
utils::UUID request_id;
while (true) {
@@ -5475,12 +5478,15 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
try {
auto& raft_server = _group0->group0_server();
+ auto group0_holder = _group0->hold_group0_gate();
// do barrier to make sure we always see the latest topology
co_await raft_server.read_barrier(&_group0_as);
if (raft_server.get_current_term() != term) {
// Return an error since the command is from outdated leader
co_return result;
}
+ auto id =
raft_server.id();
+ group0_holder.release();
{
auto& state = _raft_topology_cmd_handler_state;
@@ -5592,7 +5598,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
break;
case raft_topology_cmd::command::stream_ranges: {
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
- const auto& rs = _topology_state_machine._topology.find(
raft_server.id())->second;
+ const auto& rs = _topology_state_machine._topology.find(id)->second;
auto tstate = _topology_state_machine._topology.tstate;
if (!rs.ring || rs.ring->tokens.empty()) {
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
@@ -5641,11 +5647,11 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
utils::get_local_injector().inject("stop_after_streaming",
[] { std::raise(SIGSTOP); });
} else {
- auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[
raft_server.id()]).replaced_id;
+ auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
-
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server, replaced_id] () -> future<> {
- if (!_topology_state_machine._topology.req_param.contains(
raft_server.id())) {
- on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}",
raft_server.id()));
+
parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &id, replaced_id] () -> future<> {
+ if (!_topology_state_machine._topology.req_param.contains(id)) {
+ on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
}
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
auto ignored_nodes = boost::copy_range<std::unordered_set<locator::host_id>>(_topology_state_machine._topology.ignored_nodes | boost::adaptors::transformed([] (const auto& id) {
@@ -5729,7 +5735,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
break;
case node_state::rebuilding: {
- auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[
raft_server.id()]).source_dc;
+ auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
@@ -5776,7 +5782,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
case node_state::none:
case node_state::removing:
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
-
raft_server.id(), rs.state));
+ id, rs.state));
break;
}
}));
@@ -6492,6 +6498,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
});
auto& g0_server = _group0->group0_server();
+ auto g0_holder = _group0->hold_group0_gate();
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
// There is a peculiar case that can happen if the leader is killed
// and then replaced very quickly:
diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc
index 591f1d341c3..fe3e3e0cb1a 100644
--- a/service/topology_coordinator.cc
+++ b/service/topology_coordinator.cc
@@ -109,6 +109,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::chrono::milliseconds _ring_delay;
+ gate::holder _group0_holder;
+
using drop_guard_and_retake = bool_class<class retake_guard_tag>;
// Engaged if an ongoing topology change should be rolled back. The string inside
@@ -2494,6 +2496,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
, _tablet_allocator(tablet_allocator)
, _ring_delay(ring_delay)
+ , _group0_holder(_group0.hold_group0_gate())
{}
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
--
2.46.0