[PATCH v1 0/2] wait for all users of group0 server to complete before destroying it

0 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 30, 2024, 6:26:25 AMSep 30
to scylladb-dev@googlegroups.com
Group0 server is often used in asynchronous context, but we do not wait
for them to complete before destroying the server. We already have
shutdown gate for it, so lets use it in those asynch functions.

Also make sure to signal group0 abort source if initialization fails.

CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/12006/

Also in scylla-dev gleb/20701-fix

Gleb Natapov (2):
group0: Stop group0 if node initialization fails
group: holder group0 shutdown gate during async operations

service/raft/raft_group0.hh | 5 ++++
service/storage_service.hh | 4 ++--
main.cc | 7 ++++++
service/raft/raft_group0.cc | 5 +++-
service/storage_service.cc | 41 ++++++++++++++++++++-------------
service/topology_coordinator.cc | 3 +++
6 files changed, 46 insertions(+), 19 deletions(-)

--
2.46.0

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 30, 2024, 6:26:26 AMSep 30
to scylladb-dev@googlegroups.com
Wait for all outstanding async work that uses group0 to complete before
destroying group0 server.

Fixes scylladb/scylladb#20701
---
service/raft/raft_group0.hh | 5 +++++
service/storage_service.hh | 4 ++--
service/raft/raft_group0.cc | 5 ++++-
service/storage_service.cc | 33 ++++++++++++++++++++-------------
service/topology_coordinator.cc | 3 +++
5 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh
index 5e5de704849..38087e88364 100644
--- a/service/raft/raft_group0.hh
+++ b/service/raft/raft_group0.hh
@@ -291,6 +291,11 @@ class raft_group0 {
return _raft_gr.group0_with_timeouts();
}

+ // Hold shutdown gate to be waited during shutdown
+ gate::holder hold_group0_gate() {
+ return _shutdown_gate.hold();
+ }
+
// Returns true after the group 0 server has been started.
bool joined_group0() const;

diff --git a/service/storage_service.hh b/service/storage_service.hh
index 2e237a3e1b5..53f352b9976 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -835,7 +835,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
future<> _raft_state_monitor = make_ready_future<>();
// This fibers monitors raft state and start/stops the topology change
// coordinator fiber
- future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
+ future<> raft_state_monitor_fiber(raft::server&, gate::holder, sharded<db::system_distributed_keyspace>& sys_dist_ks);

public:
bool topology_global_queue_empty() const {
@@ -976,7 +976,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
semaphore _join_node_response_handler_mutex{1};

future<> _sstable_cleanup_fiber = make_ready_future<>();
- future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
+ future<> sstable_cleanup_fiber(raft::server& raft, gate::holder, sharded<service::storage_proxy>& proxy) noexcept;

// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
abort_source _group0_as;
diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc
index 58581f0169f..acdcd3aabd1 100644
--- a/service/raft/raft_group0.cc
+++ b/service/raft/raft_group0.cc
@@ -382,9 +382,11 @@ future<> raft_group0::abort() {
co_await smp::invoke_on_all([this]() {
return uninit_rpc_verbs(_ms.local());
});
- co_await _shutdown_gate.close();

_leadership_monitor_as.request_abort();
+
+ co_await _shutdown_gate.close();
+
co_await std::move(_leadership_monitor);

co_await stop_group0();
@@ -429,6 +431,7 @@ future<> raft_group0::leadership_monitor_fiber() {
}
});

+ auto holder = hold_group0_gate();
while (true) {
while (!group0_server().is_leader()) {
co_await group0_server().wait_for_state_change(&_leadership_monitor_as);
diff --git a/service/storage_service.cc b/service/storage_service.cc
index dd5ae6db5ad..ed486bcd782 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1032,7 +1032,7 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha

// }}} raft_ip_address_updater

-future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<service::storage_proxy>& proxy) noexcept {
+future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
while (!_group0_as.abort_requested()) {
bool err = false;
try {
@@ -1134,7 +1134,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
}
}

-future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
+future<> storage_service::raft_state_monitor_fiber(raft::server& raft, gate::holder group0_holder, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
std::optional<abort_source> as;

try {
@@ -1867,9 +1867,9 @@ future<> storage_service::join_topology(sharded<db::system_distributed_keyspace>
co_await raft_initialize_discovery_leader(join_params);

// start topology coordinator fiber
- _raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks);
+ _raft_state_monitor = raft_state_monitor_fiber(*raft_server, _group0->hold_group0_gate(), sys_dist_ks);
// start cleanup fiber
- _sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, proxy);
+ _sstable_cleanup_fiber = sstable_cleanup_fiber(*raft_server, _group0->hold_group0_gate(), proxy);

// Need to start system_distributed_keyspace before bootstrap because bootstrapping
// process may access those tables.
@@ -2150,7 +2150,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
// Start the topology coordinator monitor fiber. If we are the leader, this will start
// the topology coordinator which is responsible for driving the upgrade process.
try {
- _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), sys_dist_ks);
+ _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate(), sys_dist_ks);
} catch (...) {
// The calls above can theoretically fail due to coroutine frame allocation failure.
// Abort in this case as the node should be in a pretty bad shape anyway.
@@ -2176,7 +2176,7 @@ future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded
}

try {
- _sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), proxy);
+ _sstable_cleanup_fiber = sstable_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy);
start_tablet_split_monitor();
} catch (...) {
rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception());
@@ -3649,6 +3649,7 @@ static size_t count_normal_token_owners(const topology& topology) {

future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
+ auto holder = _group0->hold_group0_gate();
utils::UUID request_id;

while (true) {
@@ -4645,6 +4646,7 @@ future<> storage_service::do_drain() {

future<> storage_service::do_cluster_cleanup() {
auto& raft_server = _group0->group0_server();
+ auto holder = _group0->hold_group0_gate();

while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
@@ -4715,6 +4717,7 @@ future<> storage_service::wait_for_topology_not_busy() {

future<> storage_service::raft_rebuild(utils::optional_param sdc_param) {
auto& raft_server = _group0->group0_server();
+ auto holder = _group0->hold_group0_gate();
utils::UUID request_id;

while (true) {
@@ -5475,12 +5478,15 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft

try {
auto& raft_server = _group0->group0_server();
+ auto group0_holder = _group0->hold_group0_gate();
// do barrier to make sure we always see the latest topology
co_await raft_server.read_barrier(&_group0_as);
if (raft_server.get_current_term() != term) {
// Return an error since the command is from outdated leader
co_return result;
}
+ auto id = raft_server.id();
+ group0_holder.release();

{
auto& state = _raft_topology_cmd_handler_state;
@@ -5592,7 +5598,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
break;
case raft_topology_cmd::command::stream_ranges: {
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
- const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
+ const auto& rs = _topology_state_machine._topology.find(id)->second;
auto tstate = _topology_state_machine._topology.tstate;
if (!rs.ring || rs.ring->tokens.empty()) {
rtlogger.warn("got {} request but the node does not own any tokens and is in the {} state", cmd.cmd, rs.state);
@@ -5641,11 +5647,11 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
utils::get_local_injector().inject("stop_after_streaming",
[] { std::raise(SIGSTOP); });
} else {
- auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[raft_server.id()]).replaced_id;
+ auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
- parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &raft_server, replaced_id] () -> future<> {
- if (!_topology_state_machine._topology.req_param.contains(raft_server.id())) {
- on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", raft_server.id()));
+ parent_info.id, streaming::stream_reason::replace, _bootstrap_result, coroutine::lambda([this, &rs, &id, replaced_id] () -> future<> {
+ if (!_topology_state_machine._topology.req_param.contains(id)) {
+ on_internal_error(rtlogger, ::format("Cannot find request_param for node id {}", id));
}
if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) {
auto ignored_nodes = boost::copy_range<std::unordered_set<locator::host_id>>(_topology_state_machine._topology.ignored_nodes | boost::adaptors::transformed([] (const auto& id) {
@@ -5729,7 +5735,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
break;
case node_state::rebuilding: {
- auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[raft_server.id()]).source_dc;
+ auto source_dc = std::get<rebuild_param>(_topology_state_machine._topology.req_param[id]).source_dc;
rtlogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc);
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
@@ -5776,7 +5782,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
case node_state::none:
case node_state::removing:
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
- raft_server.id(), rs.state));
+ id, rs.state));
break;
}
}));
@@ -6492,6 +6498,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
});

auto& g0_server = _group0->group0_server();
+ auto g0_holder = _group0->hold_group0_gate();
if (params.replaced_id && *params.replaced_id == g0_server.current_leader()) {
// There is a peculiar case that can happen if the leader is killed
// and then replaced very quickly:
diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc
index 591f1d341c3..fe3e3e0cb1a 100644
--- a/service/topology_coordinator.cc
+++ b/service/topology_coordinator.cc
@@ -109,6 +109,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {

std::chrono::milliseconds _ring_delay;

+ gate::holder _group0_holder;
+
using drop_guard_and_retake = bool_class<class retake_guard_tag>;

// Engaged if an ongoing topology change should be rolled back. The string inside
@@ -2494,6 +2496,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
, _tablet_allocator(tablet_allocator)
, _ring_delay(ring_delay)
+ , _group0_holder(_group0.hold_group0_gate())
{}

// Returns true if the upgrade was done, returns false if upgrade was interrupted.
--
2.46.0

Kamil Braun

<kbraun@scylladb.com>
unread,
Sep 30, 2024, 7:00:04 AMSep 30
to Gleb Natapov, scylladb-dev@googlegroups.com
I think we want to backport this to 6.2 (it's not released yet, but branched)

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scylladb-dev/20240930102435.993204-2-gleb%40scylladb.com.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 30, 2024, 7:07:24 AMSep 30
to Kamil Braun, scylladb-dev@googlegroups.com
On Mon, Sep 30, 2024 at 12:59:51PM +0200, Kamil Braun wrote:
> I think we want to backport this to 6.2 (it's not released yet, but
> branched)
>
If af83c5e53eb465 is there than we do. I'll check.
--
Gleb.
Reply all
Reply to author
Forward
0 new messages