From: Duarte Nunes <
dua...@scylladb.com>
Committer: Duarte Nunes <
dua...@scylladb.com>
Branch: master
Merge 'Wean the stall detector off the timer thread' from Avi
"
The long-awaited aio poll patches remove the timer thread; but the cpu
stall detector still depends on it. This patchset disentangles the stall
detector from the reactor (to some extent), and converts it to use a
dedicated
kernel timer.
To avoid the need to re-arm the timer every task quota expiration, some slop
is introduced. Since the measurements don't need to be exact, I expect this
is fine.
Since we now have a dedicated timer, we also switch it to use a thread CPU
timer.
This protects against preemption false positives, at the expense of kernel
blocks false negatives. Since the latter are rare, I think the tradeoff is
worthwhile. Kernel blocks also have their own detector.
Finally, a unit test is added.
Tests: unit (release)
"
* tag 'stall-detector/v2' of
https://github.com/avikivity/seastar:
tests: add unit test for cpu stall detector
reactor: add hook to capture reports of the cpu stall detector
reactor: add get_blocked_reactor_notify_ms()
reactor: use thread cputime clock for stall detection
reactor: change cpu stall detector not to depend on timer thread
reactor: introduce cpu_stall_detector::account_for_missed_ticks
reactor: move stall-detection code from timer thread to stall detector
tick function
reactor: fold block_notifier_rate_limit into cpu_stall_detector
reactor: extract cpu stall detector state into a class
---
diff --git a/include/seastar/core/reactor.hh
b/include/seastar/core/reactor.hh
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -640,6 +640,7 @@ private:
namespace internal {
class reactor_stall_sampler;
+class cpu_stall_detector;
}
@@ -779,10 +780,7 @@ private:
semaphore _cpu_started;
std::atomic<uint64_t> _tasks_processed = { 0 };
std::atomic<uint64_t> _polls = { 0 };
- std::atomic<unsigned> _tasks_processed_stalled = { 0 };
- unsigned _tasks_processed_report_threshold;
- unsigned _stall_detector_reports_per_minute;
- std::atomic<uint64_t> _stall_detector_missed_ticks = { 0 };
+ std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;
unsigned _max_task_backlog = 1000;
timer_set<timer<>, &timer<>::_link> _timers;
@@ -911,6 +909,7 @@ private:
signals _signals;
thread_pool _thread_pool;
friend class thread_pool;
+ friend class internal::cpu_stall_detector;
uint64_t pending_task_count() const;
void run_tasks(task_queue& tq);
@@ -1182,6 +1181,10 @@ public:
}
void update_blocked_reactor_notify_ms(std::chrono::milliseconds ms);
+ std::chrono::milliseconds get_blocked_reactor_notify_ms() const;
+ // For testing:
+ void set_stall_detector_report_function(std::function<void ()> report);
+ std::function<void ()> get_stall_detector_report_function() const;
};
template <typename Func> // signature: bool ()
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -112,6 +112,7 @@
#include <seastar/core/metrics.hh>
#include <seastar/core/execution_stage.hh>
#include <seastar/core/exception_hacks.hh>
+#include "stall_detector.hh"
#include <yaml-cpp/yaml.h>
@@ -444,10 +445,6 @@ inline int alarm_signal() {
return SIGRTMIN;
}
-inline int block_notifier_signal() {
- return SIGRTMIN + 1;
-}
-
// Installs signal handler stack for current thread.
// The stack remains installed as long as the returned object is kept
alive.
// When it goes out of scope the previous handler is restored.
@@ -546,6 +543,7 @@ reactor::reactor(unsigned id)
#endif
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC,
TFD_CLOEXEC))
, _cpu_started(0)
+ , _cpu_stall_detector(std::make_unique<cpu_stall_detector>(this))
, _io_context(0)
, _reuseport(posix_reuseport_detect())
, _task_quota_timer_thread(&reactor::task_quota_timer_thread_fn, this)
@@ -574,7 +572,7 @@ reactor::reactor(unsigned id)
r = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
assert(r >= 0);
sigemptyset(&mask);
- sigaddset(&mask, block_notifier_signal());
+ sigaddset(&mask, cpu_stall_detector::signal_number());
r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
assert(r == 0);
#endif
@@ -588,7 +586,7 @@ reactor::reactor(unsigned id)
reactor::~reactor() {
sigset_t mask;
sigemptyset(&mask);
- sigaddset(&mask, block_notifier_signal());
+ sigaddset(&mask, cpu_stall_detector::signal_number());
auto r = ::pthread_sigmask(SIG_BLOCK, &mask, NULL);
assert(r == 0);
@@ -623,6 +621,107 @@ inline Integral
increment_nonatomically(std::atomic<Integral>& value) {
return add_nonatomically(value, Integral(1));
}
+cpu_stall_detector::cpu_stall_detector(reactor* r,
cpu_stall_detector_config cfg)
+ : _r(r)
+ , _shard_id(_r->cpu_id()) {
+ update_config(cfg);
+ struct sigevent sev = {};
+ sev.sigev_notify = SIGEV_THREAD_ID;
+ sev.sigev_signo = signal_number();
+ sev._sigev_un._tid = syscall(SYS_gettid);
+ int err = timer_create(CLOCK_THREAD_CPUTIME_ID, &sev, &_timer);
+ if (err) {
+ throw std::system_error(std::error_code(err,
std::system_category()));
+ }
+ // note: if something is added here that can, it should take care to
destroy _timer.
+}
+
+cpu_stall_detector::~cpu_stall_detector() {
+ timer_delete(_timer);
+}
+
+cpu_stall_detector_config
+cpu_stall_detector::get_config() const {
+ return _config;
+}
+
+void cpu_stall_detector::update_config(cpu_stall_detector_config cfg) {
+ _config = cfg;
+ _threshold =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold);
+ _slack =
std::chrono::duration_cast<std::chrono::steady_clock::duration>(cfg.threshold
* cfg.slack);
+ _stall_detector_reports_per_minute =
cfg.stall_detector_reports_per_minute;
+ _max_reports_per_minute = cfg.stall_detector_reports_per_minute;
+ _rearm_timer_at = std::chrono::steady_clock::now();
+}
+
+void cpu_stall_detector::maybe_report() {
+ if (_reported++ < _max_reports_per_minute) {
+ generate_trace();
+ }
+}
+// We use a tick at every timer firing so we can report suppressed
backtraces.
+// Best case it's a correctly predicted branch. If a backtrace had
happened in
+// the near past it's an increment and two branches.
+//
+// We can do it a cheaper if we don't report suppressed backtraces.
+void cpu_stall_detector::on_signal() {
+ if (_active.load(std::memory_order_relaxed)) {
+ maybe_report();
+ _report_at <<= 1;
+ arm_timer();
+ }
+}
+
+void
cpu_stall_detector::report_suppressions(std::chrono::steady_clock::time_point
now) {
+ if (now > _minute_mark + 60s) {
+ if (_reported) {
+ auto supressed = _reported - _max_reports_per_minute;
+ backtrace_buffer buf;
+ // Reuse backtrace buffer infrastructure so we don't have to
allocate here
+ buf.append("Rate-limit: supressed ");
+ buf.append_decimal(_reported - _max_reports_per_minute);
+ supressed == 1 ? buf.append(" backtrace") : buf.append("
backtraces");
+ buf.append(" on shard ");
+ buf.append_decimal(_shard_id);
+ buf.append("\n");
+ buf.flush();
+ }
+ _reported = 0;
+ _minute_mark = now;
+ }
+}
+
+void cpu_stall_detector::arm_timer() {
+ auto its = posix::to_relative_itimerspec(_threshold * _report_at +
_slack, 0s);
+ timer_settime(_timer, 0, &its, nullptr);
+}
+
+void
cpu_stall_detector::start_task_run(std::chrono::steady_clock::time_point
now) {
+ if (now > _rearm_timer_at) {
+ report_suppressions(now);
+ _report_at = 1;
+ _run_started_at = now;
+ _rearm_timer_at = now + _threshold * _report_at;
+ arm_timer();
+ }
+ _active.store(true, std::memory_order_relaxed);
+ std::atomic_signal_fence(std::memory_order_release); // Don't delay
this write, so the signal handler can see it
+}
+
+void
cpu_stall_detector::end_task_run(std::chrono::steady_clock::time_point now)
{
+ std::atomic_signal_fence(std::memory_order_acquire); // Don't hoist
this write, so the signal handler can see it
+ _active.store(false, std::memory_order_relaxed);
+}
+
+void cpu_stall_detector::start_sleep() {
+ auto its = posix::to_relative_itimerspec(0s, 0s);
+ timer_settime(_timer, 0, &its, nullptr);
+ _rearm_timer_at = std::chrono::steady_clock::now();
+}
+
+void cpu_stall_detector::end_sleep() {
+}
+
void
reactor::task_quota_timer_thread_fn() {
auto thread_name = seastar::format("timer-{}", _id);
@@ -639,57 +738,6 @@ reactor::task_quota_timer_thread_fn() {
abort();
}
- unsigned report_at = _tasks_processed_report_threshold;
- uint64_t last_tasks_processed_seen = 0;
- uint64_t last_polls_seen = 0;
-
- class block_notifier_rate_limiter {
- unsigned _reported = 0;
- unsigned _ticks = 0;
- unsigned _ticks_per_minute;
- unsigned _max_reports_per_minute;
- unsigned _shard_id;
- unsigned _thread_id;
- public:
- void maybe_report(pthread_t who, int sig) {
- if (_reported++ < _max_reports_per_minute) {
- pthread_kill(who, sig);
- }
- }
- // We use a tick at every timer firing so we can report suppressed
backtraces.
- // Best case it's a correctly predicted branch. If a backtrace had
happened in
- // the near past it's an increment and two branches.
- //
- // We can do it a cheaper if we don't report suppressed backtraces.
- void tick(unsigned ticks = 1) {
- if (!_reported) {
- return;
- }
- _ticks += ticks;
- if (_ticks >= _ticks_per_minute) {
- if (_reported > _max_reports_per_minute) {
- auto supressed = _reported - _max_reports_per_minute;
- backtrace_buffer buf;
- // Reuse backtrace buffer infrastructure so we don't
have to allocate here
- buf.append("Rate-limit: supressed ");
- buf.append_decimal(_reported -
_max_reports_per_minute);
- supressed == 1 ? buf.append(" backtrace") :
buf.append(" backtraces");
- buf.append(" on shard ");
- buf.append_decimal(_shard_id);
- buf.append("\n");
- buf.flush();
- }
- _reported = 0;
- _ticks = 0;
- }
- }
- block_notifier_rate_limiter(unsigned ticks_per_minute, unsigned
max_reports, unsigned shard_id)
- : _ticks_per_minute(ticks_per_minute)
- , _max_reports_per_minute(max_reports)
- , _shard_id(shard_id)
- {}
- };
-
// We need to wait until task quota is set before we can calculate how
many ticks are to
// a minute. Technically task_quota is used from many threads, but
since it is read-only here
// and only used during initialization we will avoid complicating the
code.
@@ -698,54 +746,62 @@ reactor::task_quota_timer_thread_fn() {
_task_quota_timer.read(&events, 8);
_local_need_preempt = true;
}
- block_notifier_rate_limiter rate_limit(unsigned(60s / _task_quota),
_stall_detector_reports_per_minute, _id);
- uint64_t saved_missed_ticks = 0;
while (!_dying.load(std::memory_order_relaxed)) {
uint64_t events;
_task_quota_timer.read(&events, 8);
_local_need_preempt = true;
- auto missed_ticks =
_stall_detector_missed_ticks.load(std::memory_order_relaxed);
- rate_limit.tick(std::max(uint64_t(1), missed_ticks -
saved_missed_ticks));
- saved_missed_ticks = missed_ticks;
-
- auto tp = _tasks_processed.load(std::memory_order_relaxed);
- auto p = _polls.load(std::memory_order_relaxed);
- if ((tp == last_tasks_processed_seen) && (p == last_polls_seen)) {
- if ((increment_nonatomically(_tasks_processed_stalled) ==
report_at)) {
- rate_limit.maybe_report(_thread_id,
block_notifier_signal());
- report_at <<= 1;
- }
- } else {
- last_tasks_processed_seen = tp;
- last_polls_seen = p;
- _tasks_processed_stalled.store(0, std::memory_order_relaxed);
- report_at = _tasks_processed_report_threshold;
- }
-
// We're in a different thread, but guaranteed to be on the same
core, so even
// a signal fence is overdoing it
std::atomic_signal_fence(std::memory_order_seq_cst);
}
}
void
reactor::update_blocked_reactor_notify_ms(std::chrono::milliseconds ms) {
- unsigned threshold = ms / _task_quota;
- if (threshold != _tasks_processed_report_threshold) {
- _tasks_processed_report_threshold = threshold;
+ auto cfg = _cpu_stall_detector->get_config();
+ if (ms != cfg.threshold) {
+ cfg.threshold = ms;
+ _cpu_stall_detector->update_config(cfg);
seastar_logger.info("updated: blocked-reactor-notify-ms={}",
ms.count());
}
}
+std::chrono::milliseconds
+reactor::get_blocked_reactor_notify_ms() const {
+ auto d = _cpu_stall_detector->get_config().threshold;
+ return std::chrono::duration_cast<std::chrono::milliseconds>(d);
+}
+
+void
+reactor::set_stall_detector_report_function(std::function<void ()> report)
{
+ auto cfg = _cpu_stall_detector->get_config();
+ cfg.report = std::move(report);
+ _cpu_stall_detector->update_config(std::move(cfg));
+}
+
+std::function<void ()>
+reactor::get_stall_detector_report_function() const {
+ return _cpu_stall_detector->get_config().report;
+}
+
void
reactor::block_notifier(int) {
- auto steps =
engine()._tasks_processed_stalled.load(std::memory_order_relaxed);
- auto delta =
std::chrono::duration_cast<std::chrono::milliseconds>(engine()._task_quota
* steps);
+ engine()._cpu_stall_detector->generate_trace();
+}
+
+void
+cpu_stall_detector::generate_trace() {
+ auto delta = std::chrono::steady_clock::now() - _run_started_at;
+
+ if (_config.report) {
+ _config.report();
+ return;
+ }
backtrace_buffer buf;
buf.append("Reactor stalled for ");
- buf.append_decimal(uint64_t(delta.count()));
+ buf.append_decimal(uint64_t(delta / 1ms));
buf.append(" ms");
print_with_backtrace(buf);
}
@@ -852,8 +908,10 @@ void
reactor::configure(boost::program_options::variables_map vm) {
_task_quota =
std::chrono::duration_cast<sched_clock::duration>(task_quota);
auto blocked_time = vm["blocked-reactor-notify-ms"].as<unsigned>() *
1ms;
- _tasks_processed_report_threshold = unsigned(blocked_time /
task_quota);
- _stall_detector_reports_per_minute =
vm["blocked-reactor-reports-per-minute"].as<unsigned>();
+ cpu_stall_detector_config csdc;
+ csdc.threshold = blocked_time;
+ csdc.stall_detector_reports_per_minute =
vm["blocked-reactor-reports-per-minute"].as<unsigned>();
+ _cpu_stall_detector->update_config(csdc);
_max_task_backlog = vm["max-task-backlog"].as<unsigned>();
_max_poll_time = vm["idle-poll-time-us"].as<unsigned>() * 1us;
@@ -3111,6 +3169,7 @@ reactor::run_some_tasks() {
sched_clock::time_point t_run_completed =
std::chrono::steady_clock::now();
STAP_PROBE(seastar, reactor_run_tasks_start);
+ _cpu_stall_detector->start_task_run(t_run_completed);
do {
auto t_run_started = t_run_completed;
insert_activating_task_queues();
@@ -3132,6 +3191,7 @@ reactor::run_some_tasks() {
tq->_active = false;
}
} while (have_more_tasks() && !need_preempt());
+ _cpu_stall_detector->end_task_run(t_run_completed);
STAP_PROBE(seastar, reactor_run_tasks_end);
*internal::current_scheduling_group_ptr() =
default_scheduling_group(); // Prevent inheritance from last group run
sched_print("run_some_tasks: end");
@@ -3254,7 +3314,7 @@ int reactor::run() {
struct sigaction sa_block_notifier = {};
sa_block_notifier.sa_handler = &reactor::block_notifier;
sa_block_notifier.sa_flags = SA_RESTART;
- auto r = sigaction(block_notifier_signal(), &sa_block_notifier,
nullptr);
+ auto r = sigaction(cpu_stall_detector::signal_number(),
&sa_block_notifier, nullptr);
assert(r == 0);
bool idle = false;
@@ -3315,10 +3375,11 @@ int reactor::run() {
struct itimerspec zero_itimerspec = {};
_task_quota_timer.timerfd_settime(0, zero_itimerspec);
auto start_sleep = sched_clock::now();
+ _cpu_stall_detector->start_sleep();
sleep();
+ _cpu_stall_detector->end_sleep();
// We may have slept for a while, so freshen idle_end
idle_end = sched_clock::now();
- add_nonatomically(_stall_detector_missed_ticks,
uint64_t((idle_end - start_sleep)/_task_quota));
_total_sleep += idle_end - start_sleep;
_task_quota_timer.timerfd_settime(0,
task_quote_itimerspec);
}
diff --git a/src/core/stall_detector.hh b/src/core/stall_detector.hh
--- a/src/core/stall_detector.hh
+++ b/src/core/stall_detector.hh
@@ -0,0 +1,82 @@
+
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding
copyright
+ * ownership. You may not use this file except in compliance with the
License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2018 ScyllaDB
+ */
+
+#pragma once
+
+#include <signal.h>
+#include <limits>
+#include <chrono>
+#include <functional>
+#include <seastar/core/posix.hh>
+
+namespace seastar {
+
+class reactor;
+
+namespace internal {
+
+struct cpu_stall_detector_config {
+ std::chrono::duration<double> threshold = std::chrono::seconds(2);
+ unsigned stall_detector_reports_per_minute = 1;
+ float slack = 0.3; // fraction of threshold that we're allowed to
overshoot
+ std::function<void ()> report; // alternative reporting function for
tests
+};
+
+// Detects stalls in continuations that run for too long
+class cpu_stall_detector {
+ reactor* _r;
+ timer_t _timer;
+ std::atomic<bool> _active{};
+ unsigned _stall_detector_reports_per_minute;
+ std::atomic<uint64_t> _stall_detector_missed_ticks = { 0 };
+ unsigned _reported = 0;
+ unsigned _max_reports_per_minute;
+ unsigned _shard_id;
+ unsigned _thread_id;
+ unsigned _report_at{};
+ std::chrono::steady_clock::time_point _minute_mark{};
+ std::chrono::steady_clock::time_point _rearm_timer_at{};
+ std::chrono::steady_clock::time_point _run_started_at{};
+ std::chrono::steady_clock::duration _threshold;
+ std::chrono::steady_clock::duration _slack;
+ cpu_stall_detector_config _config;
+ friend reactor;
+private:
+ void maybe_report();
+ void arm_timer();
+ void report_suppressions(std::chrono::steady_clock::time_point now);
+public:
+ cpu_stall_detector(reactor* r, cpu_stall_detector_config cfg = {});
+ ~cpu_stall_detector();
+ static int signal_number() { return SIGRTMIN + 1; }
+ void start_task_run(std::chrono::steady_clock::time_point now);
+ void end_task_run(std::chrono::steady_clock::time_point now);
+ void generate_trace();
+ void update_config(cpu_stall_detector_config cfg);
+ cpu_stall_detector_config get_config() const;
+ void on_signal();
+ void start_sleep();
+ void end_sleep();
+};
+
+}
+}
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -253,6 +253,9 @@ seastar_add_test (smp
seastar_add_test (sstring
SOURCES sstring_test.cc)
+seastar_add_test (stall_detector
+ SOURCES stall_detector_test.cc)
+
seastar_add_test (thread
SOURCES thread_test.cc)
diff --git a/tests/unit/stall_detector_test.cc
b/tests/unit/stall_detector_test.cc
--- a/tests/unit/stall_detector_test.cc
+++ b/tests/unit/stall_detector_test.cc
@@ -0,0 +1,82 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding
copyright
+ * ownership. You may not use this file except in compliance with the
License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2018 ScyllaDB Ltd.
+ */
+
+#include <seastar/core/reactor.hh>
+#include "test-utils.hh"
+#include <atomic>
+#include <chrono>
+
+using namespace seastar;
+using namespace std::chrono_literals;
+
+class temporary_stall_detector_settings {
+ std::chrono::milliseconds _old_threshold;
+ std::function<void ()> _old_report;
+public:
+ temporary_stall_detector_settings(std::chrono::duration<double>
threshold, std::function<void ()> report)
+ : _old_threshold(engine().get_blocked_reactor_notify_ms())
+ , _old_report(engine().get_stall_detector_report_function()) {
+
engine().update_blocked_reactor_notify_ms(std::chrono::duration_cast<std::chrono::milliseconds>(threshold));
+ engine().set_stall_detector_report_function(std::move(report));
+ }
+ ~temporary_stall_detector_settings() {
+ engine().update_blocked_reactor_notify_ms(_old_threshold);
+
engine().set_stall_detector_report_function(std::move(_old_report));
+ }
+};
+
+void spin(std::chrono::duration<double> how_much) {
+ auto end = std::chrono::steady_clock::now() + how_much;
+ while (std::chrono::steady_clock::now() < end) {
+ // spin!
+ }
+}
+
+void spin_some_cooperatively(std::chrono::duration<double> how_much) {
+ auto end = std::chrono::steady_clock::now() + how_much;
+ while (std::chrono::steady_clock::now() < end) {
+ spin(200us);
+ if (need_preempt()) {
+ thread::yield();
+ }
+ }
+}
+
+SEASTAR_THREAD_TEST_CASE(normal_case) {
+ std::atomic<unsigned> reports{};
+ temporary_stall_detector_settings tsds(10ms, [&] { ++reports; });
+ spin_some_cooperatively(1s);
+ BOOST_REQUIRE_EQUAL(reports, 0);
+}
+
+SEASTAR_THREAD_TEST_CASE(simple_stalls) {
+ std::atomic<unsigned> reports{};
+ temporary_stall_detector_settings tsds(10ms, [&] { ++reports; });
+ unsigned nr = 10;
+ for (unsigned i = 0; i < nr; ++i) {
+ spin_some_cooperatively(100ms);
+ spin(20ms);
+ }
+ spin_some_cooperatively(100ms);
+ BOOST_REQUIRE_EQUAL(reports, 10);
+}
+
+