This patch replaces the algorithm which the scheduler uses to keep
track of threads' runtime, and to choose which thread to run next and
for how long.
The previous algorithm used the raw cumulative runtime of a thread
as its runtime measure. But comparing these numbers directly was
impossible: e.g., should a thread that slept for an hour now get
an hour of uninterrupted CPU time? This resulted in a hodgepodge of
heuristics which "modified" and "fixed" the runtime. These heuristics
did work quite well in our test cases, but we were forced to add more
and more unjustified heuristics and constants to fix scheduling bugs
as they were discovered. The existing scheduler was especially
problematic with thread migration (moving a thread from one CPU to
another) as the runtime measure on one CPU was meaningless in another.
This bug, if not corrected, (e.g., by the patch which I sent a couple
of weeks ago) can cause crucial threads to acquire exceedingly high
runtimes by mistake, and resulted in the tst-loadbalance test using
only one CPU on a two-CPU guest.
The new scheduling algorithm follows a much more rigorous design,
proposed by Avi Kivity in:
https://docs.google.com/a/cloudius-systems.com/document/d/1W7KCxOxP-1Fy5EyF2lbJGE2WuKmu5v0suYqoHas1jRM/edit#
To make a long story short (read the document if you want all the
details), the new algorithm is based on a runtime measure R which
is the running decaying average of the thread's running time.
It is a decaying average in the sense that the thread's running or
sleeping in recent history is given more weight than its behavior
a long time ago. This measure R can tell us which of the runnable
threads to run next (the one with the lowest R), and using some
highschool-level mathematics, we can calculate for how long to run
this thread until it should be preempted by the next one. R carries
the same meaning on all CPUs, so CPU migration becomes trivial.
The actual implementation uses a normalized version of R, called R''
(Rtt in the code), which is also explained in detail in the document.
This Rtt allows updating just the running thread's runtime - not all
threads' runtime - as time passes, making the whole calculation much
more tractable.
The benefits of the new scheduler code over the existing one are:
1. A more rigourous design with fewer unjustified heuristics.
2. A thread's runtime measurement correctly survives a migration to a
different CPU, unlike the existing code (which completely botches
it up, leading to threads hanging), or my previous patch (which
simply forgot the previous runtime, as if this was a new thread).
In particular, tst-loadbalance now gives good results for the
"intermittent thread" test, unlike the previous code which in 50%
of the runs caused one CPU to be completely wasted (when the load-
balancing thread hung).
3. The new algorithm can look at a much longer runtime history than the
previous algorithm did. With the default tau=100ms, the one-cpu
intermittent thread test of tst-scheduler now provides good
fairness for sleep durations of 4ms-32ms.
The previous algorithm was never fair in any of those tests.
This patch probably needs more cleanup, the document should be
improved (and a paper written? :-)), and I need to do more testing
on real workloads, not just synthetic tests. This patch also needs
some performance measurements, and improvements (e.g., it uses a
too-accurate expf() function, and calls it too many times).
But in general, this code already works (in tests...) and I think
this is a good direction.
Signed-off-by: Nadav Har'El <
n...@cloudius-systems.com>
---
core/sched.cc | 269 ++++++++++++++++++++++++++++++++++++++++++------------
include/sched.hh | 79 +++++++++++++++-
scripts/loader.py | 6 +-
3 files changed, 286 insertions(+), 68 deletions(-)
diff --git a/core/sched.cc b/core/sched.cc
index fcbc2e2..cbf5e14 100644
--- a/core/sched.cc
+++ b/core/sched.cc
@@ -27,7 +27,7 @@ extern char _percpu_start[], _percpu_end[], _percpu_sec_end[];
namespace sched {
-TRACEPOINT(trace_sched_switch, "to %p vold=%d vnew=%d", thread*, s64, s64);
+TRACEPOINT(trace_sched_switch, "to %p vold=%g vnew=%g", thread*, float, float);
TRACEPOINT(trace_sched_wait, "");
TRACEPOINT(trace_sched_wake, "wake %p", thread*);
TRACEPOINT(trace_sched_migrate, "thread=%p cpu=%d", thread*, unsigned);
@@ -36,6 +36,9 @@ TRACEPOINT(trace_sched_preempt, "");
TRACEPOINT(trace_timer_set, "timer=%p time=%d", timer_base*, s64);
TRACEPOINT(trace_timer_cancel, "timer=%p", timer_base*);
TRACEPOINT(trace_timer_fired, "timer=%p", timer_base*);
+TRACEPOINT(trace_ranfor_old, "(%p) for %d old: Rtag,c=%g,%g", thread_runtime*, s64, float, float);
+TRACEPOINT(trace_ranfor_new, "(%p) for %d new: Rtag,c=%g,%g", thread_runtime*, s64, float, float);
+TRACEPOINT(trace_ranfor_re, "(%p) for %d renormalize: Rtag,c=%g,%g", thread_runtime*, s64, float, float);
std::vector<cpu*> cpus __attribute__((init_priority(CPUS_INIT_PRIO)));
@@ -49,9 +52,21 @@ elf::tls_data tls;
inter_processor_interrupt wakeup_ipi{[] {}};
-constexpr s64 vruntime_bias = 4_ms;
-constexpr s64 max_slice = 10_ms;
constexpr s64 context_switch_penalty = 10_us;
+// Tau controls the length of the runtime history we consider for scheduling.
+// This history isn't forgotten after tau, rather, we use a decaying average
+// and tau determines the rate of this decay.
+// In particular, it can be seen that if a thread has been monopolizing the
+// CPU, and a long-sleeping thread wakes up (or new thread is created),
+// the new thread will get to run for ln2*tau. (ln2 is roughly 0.7).
+constexpr s64 tau = 100_ms;
+// When two cpu-busy threads at equal priority compete, they will alternate
+// a time-slice of desired_timeslice each.
+constexpr s64 desired_timeslice = 2_ms;
+constexpr float cmax = 0x1P63;
+constexpr float cinitial = 0x1P-63;
+
+constexpr float finf = std::numeric_limits<float>::infinity();
mutex cpu::notifier::_mtx;
std::list<cpu::notifier*> cpu::notifier::_notifiers __attribute__((init_priority(NOTIFIERS_INIT_PRIO)));
@@ -79,6 +94,8 @@ cpu::cpu(unsigned _id)
, idle_thread()
, terminating_thread(nullptr)
, running_since(clock::get()->time())
+ , c(cinitial)
+ , renormalize_count(0)
{
auto max_size = _percpu_sec_end - _percpu_end;
auto pcpu_size = _percpu_end - _percpu_start;
@@ -94,7 +111,7 @@ cpu::cpu(unsigned _id)
void cpu::init_idle_thread()
{
idle_thread = new thread([this] { idle(); }, thread::attr(this));
- idle_thread->_vruntime = std::numeric_limits<s64>::max();
+ idle_thread->_runtime.set_priority(finf);
}
void cpu::schedule(bool yield)
@@ -109,49 +126,76 @@ void cpu::reschedule_from_interrupt(bool preempt)
{
need_reschedule = false;
handle_incoming_wakeups();
+
auto now = clock::get()->time();
- thread* p = thread::current();
- // avoid cycling through the runqueue if p still has the highest priority
- auto bias = vruntime_bias;
- s64 current_run = now - running_since;
- if (p->_vruntime + current_run < 0) { // overflow (idle thread)
- current_run = 0;
- }
- if (current_run > max_slice) {
- // This thread has run for a long time, or clock:time() jumped. But if
- // we increase vruntime by the full amount, this thread might go into
- // a huge cpu time debt and won't be scheduled again for a long time.
- // So limit the vruntime increase.
- current_run = max_slice;
- }
- if (p->_status == thread::status::running
- && (runqueue.empty()
- || p->_vruntime + current_run < runqueue.begin()->_vruntime + bias)) {
- update_preemption_timer(p, now, current_run);
- return;
+ auto interval = now - running_since;
+ running_since = now;
+ if (interval == 0) {
+ // This is an unexpected case, but I've seen it happen during SMP
+ // startup, where we get a loop of schedules with zero intervals,
+ // making again and again the decision to run the same looping thread.
+ // This will get us out of this mess. TODO: rethink.
+ interval += context_switch_penalty;
}
- p->_vruntime += current_run;
+ thread* p = thread::current();
+ p->_runtime.ran_for(interval);
+
if (p->_status == thread::status::running) {
+ // The current thread is still runnable. Check if it still has the
+ // lowest runtime, and update the timer until the next thread's turn.
+ if (runqueue.empty()) {
+ preemption_timer.cancel();
+ return;
+ } else {
+ auto &t = *runqueue.begin();
+ if (p->_runtime.get_local() < t._runtime.get_local()) {
+ preemption_timer.cancel();
+ auto delta = p->_runtime.time_until(t._runtime.get_local());
+ if (delta > 0) {
+ preemption_timer.set(now + delta);
+ }
+ return;
+ }
+ }
+ // If we're here, p no longer has the lowest runtime
p->_status.store(thread::status::queued);
enqueue(*p);
}
+
auto ni = runqueue.begin();
auto n = &*ni;
runqueue.erase(ni);
- running_since = now;
assert(n->_status.load() == thread::status::queued);
n->_status.store(thread::status::running);
if (n != thread::current()) {
+ // If we got here, we're leaving thread p and switching to n. Return
+ // back the time p borrowed for hysteresis, and borrow time for n.
+ // Use "false" on the first ran_for, as the runqueue is inconsistent
+ // (it includes p, but not n, so we'll renormalize p twice but not n).
+ p->_runtime.ran_for(desired_timeslice/2, false);
+ n->_runtime.ran_for(-desired_timeslice/2);
+
if (preempt) {
trace_sched_preempt();
p->_fpu.save();
}
if (p->_status.load(std::memory_order_relaxed) == thread::status::queued
&& p != idle_thread) {
- n->_vruntime += context_switch_penalty;
+ // TODO: we have as many as 5 ran_for() calls in this function.
+ // Can we do fewer? Certainly this one can be added to the
+ // -desired_timeslice/2 above.
+ n->_runtime.ran_for(context_switch_penalty);
}
- trace_sched_switch(n, p->_vruntime, n->_vruntime);
- update_preemption_timer(n, now, 0);
+ trace_sched_switch(n, p->_runtime.get_local(), n->_runtime.get_local());
+ preemption_timer.cancel();
+ if (!runqueue.empty()) {
+ auto& t = *runqueue.begin();
+ auto delta = n->_runtime.time_until(t._runtime.get_local());
+ if (delta > 0) {
+ preemption_timer.set(now + delta);
+ }
+ }
+
n->switch_to();
if (preempt) {
p->_fpu.restore();
@@ -163,21 +207,6 @@ void cpu::reschedule_from_interrupt(bool preempt)
}
}
-void cpu::update_preemption_timer(thread* current, s64 now, s64 run)
-{
- preemption_timer.cancel();
- if (runqueue.empty()) {
- return;
- }
- auto& t = *runqueue.begin();
- auto delta = t._vruntime - (current->_vruntime + run);
- auto expire = now + delta + vruntime_bias;
- if (expire > 0) {
- // avoid idle thread related overflow
- preemption_timer.set(expire);
- }
-}
-
void cpu::timer_fired()
{
// nothing to do, preemption will happen if needed
@@ -260,30 +289,21 @@ void cpu::handle_incoming_wakeups()
irq_save_lock_type irq_lock;
WITH_LOCK(irq_lock) {
t._status.store(thread::status::queued);
- enqueue(t, true);
+ // Make sure the CPU-local runtime measure is suitably
+ // normalized. We may need to convert a global value to the
+ // local value when waking up after a CPU migration, or to
+ // perform renormalizations which we missed while sleeping.
+ t._runtime.update_after_sleep();
+ enqueue(t);
t.resume_timers();
}
}
}
}
-void cpu::enqueue(thread& t, bool waking)
+void cpu::enqueue(thread& t)
{
trace_sched_queue(&t);
- if (waking) {
- // If a waking thread has a really low vruntime, allow it only
- // one extra timeslice; otherwise it would dominate the runqueue
- // and starve out other threads
- auto current = thread::current();
- if (current != idle_thread) {
- auto head = current->_vruntime - max_slice;
- t._vruntime = std::max(t._vruntime, head);
- }
- }
- // special treatment for idle thread: make sure it is in the back of the queue
- if (&t == idle_thread) {
- t._vruntime = thread::max_vruntime;
- }
runqueue.insert_equal(t);
}
@@ -332,6 +352,9 @@ void cpu::load_balance()
mig._status.store(thread::status::waking);
mig.suspend_timers();
mig._cpu = min;
+ // Convert the CPU-local runtime measure to a globally meaningful
+ // measure
+ mig._runtime.export_runtime();
mig.remote_thread_local_var(::percpu_base) = min->percpu_base;
mig.remote_thread_local_var(current_cpu) = min;
min->incoming_wakeups[id].push_front(mig);
@@ -434,9 +457,9 @@ thread::thread(std::function<void ()> func, attr attr, bool main)
: _func(func)
, _status(status::unstarted)
, _attr(attr)
- , _vruntime(main ? 0 : current()->_vruntime)
, _ref_counter(1)
, _joiner()
+ , _runtime()
{
WITH_LOCK(thread_list_mutex) {
thread_list.push_back(*this);
@@ -889,6 +912,132 @@ void init_tls(elf::tls_data tls_data)
tls = tls_data;
}
+void thread_runtime::export_runtime()
+{
+ _Rtt /= cpu::current()->c;;
+ _renormalize_count = -1; // special signal to update_after_sleep()
+}
+
+void thread_runtime::update_after_sleep()
+{
+ auto cpu_renormalize_count = cpu::current()->renormalize_count;
+ if (_renormalize_count == cpu_renormalize_count) {
+ return;
+ }
+ if (_renormalize_count == -1) {
+ // export_runtime() was used to convert the CPU-local runtime to
+ // a global value. We need to convert it back to a local value,
+ // suitable for this CPU.
+ _Rtt *= cpu::current()->c;
+ } else if (_renormalize_count + 1 == cpu_renormalize_count) {
+ _Rtt /= cmax;
+ } else if (_Rtt != finf) {
+ // We need to divide by cmax^2 or even a higher power. We assume
+ // this will bring Rtt to zero anyway, so sense in doing an
+ // accurate calculation
+ _Rtt = 0;
+ }
+ _renormalize_count = cpu_renormalize_count;
+}
+
+void thread_runtime::ran_for(s64 time, bool renormalize)
+{
+ trace_ranfor_old(this, time, _Rtt, cpu::current()->c);
+ // TODO: use approximation instead of real exp function.
+ const double factor = expf((float)time/tau);
+ if (factor == 1) {
+ // When factor==1, the following code anyway does nothing, except
+ // complicate us with an undefined inf*0 calculation.
+ return;
+ }
+ const auto c = cpu::current()->c * factor;
+
+ // During our boot process, unfortunately clock::time() jumps by the
+ // amount of host uptime, which can be huge and cause the above
+ // calculations to overflow. In this case, just ignore this time period.
+ if (c == finf) {
+ return;
+ }
+
+ // When a thread is created, it gets _Rtt = 0, so its _renormalize_count
+ // is irrelevant (and couldn't be set correctly in the constructor). So
+ // set it here
+ if (_Rtt == 0) {
+ _renormalize_count = cpu::current()->renormalize_count;
+ }
+
+ cpu::current()->c = c;
+
+ if (_priority == finf) {
+ // TODO: the only reason we need this case is so that time<0
+ // will bring us to +inf, not -inf. Think if there's a cleaner
+ // alternative to doing this if.
+ _Rtt = finf;
+ } else {
+ _Rtt += c * _priority * (1 - 1/factor);
+ }
+
+ // Assert we didn't get into weird numbers we can't get out of,
+ // like inf (except the idle thread with priority=inf) and nan:
+ if (_priority != finf) {
+ assert (_Rtt >= std::numeric_limits<float>::lowest() &&
+ _Rtt <= std::numeric_limits<float>::max());
+ } else {
+ assert (_Rtt == 0 || _Rtt == finf);
+ }
+ assert (_renormalize_count != -1); // forgot to update_after_sleep?
+
+ trace_ranfor_new(this, time, _Rtt, c);
+
+ // As time goes by, the normalization constant grows towards infinity.
+ // To avoid an overflow, we need to renormalize if c becomes too big.
+ // We only renormalize the runtime of the running or runnable threads.
+ // Sleeping threads will be renormalized when they wake
+ // (see update_after_sleep()), depending on the number of renormalization
+ // steps they have missed (this is why we need to keep a counter).
+ if (c < cmax || !renormalize) {
+ return;
+ }
+ if (++cpu::current()->renormalize_count < 0) {
+ // Don't use negative values; We use -1 to mark export_runtime()
+ cpu::current()->renormalize_count = 0;
+ }
+ _Rtt /= cmax / cinitial;
+ _renormalize_count = cpu::current()->renormalize_count;
+ for (auto &t : cpu::current()->runqueue) {
+ if (t._runtime._renormalize_count >= 0) {
+ t._runtime._Rtt /= cmax / cinitial;
+ t._runtime._renormalize_count++;
+ }
+ }
+ cpu::current()->c /= cmax / cinitial;
+ trace_ranfor_re(this, time, _Rtt, cpu::current()->c);
+}
+
+s64 thread_runtime::time_until(float target_local_runtime) const
+{
+ if (_priority == finf) {
+ return -1;
+ }
+ // TODO: use approximation instead of real log function.
+ float ret = tau * logf(1 +
+ (target_local_runtime - _Rtt) / _priority / cpu::current()->c);
+ if (ret > (float)std::numeric_limits<s64>::max())
+ return -1;
+ return (s64) ret;
+}
+
+thread_runtime::thread_runtime()
+{
+ _priority = 1.0;
+ _Rtt = 0.0;
+ // When _Rtt=0, multiplicative normalization doesn't matter, so it
+ // doesn't matter what we set it here. We can't set it properly here
+ // (the constructor doesn't run from the scheduler, and we don't know
+ // which CPU's counter to copy), so we'll fix it in ran_for().
+ _renormalize_count = -1;
+}
+
}
irq_lock_type irq_lock;
diff --git a/include/sched.hh b/include/sched.hh
index 7e72970..8552d78 100644
--- a/include/sched.hh
+++ b/include/sched.hh
@@ -19,6 +19,7 @@
#include <atomic>
#include "osv/lockless-queue.hh"
#include <list>
+#include <math.h>
extern "C" {
void smp_main();
@@ -173,6 +174,73 @@ public:
explicit timer(thread& t);
};
+// thread_runtime is used to maintain the scheduler's view of the thread's
+// priority relative to other threads. It knows about a static priority of the
+// thread (allowing a certain thread to get more runtime than another threads)
+// and is used to maintain the "runtime" of each thread, a number which the
+// scheduler uses to decide which thread to run next, and for how long.
+//
+// All methods of this class should be called only from within the scheduler.
+class thread_runtime {
+public:
+ // Get the thread's CPU-local runtime, a number used to sort the
+ // runqueue on this CPU (lowest runtime will be run first).
+ // local_runtime cannot be compared between different threads of the same
+ // CPU - see also export_runtime().
+ inline float get_local() const
+ {
+ return _Rtt;
+ }
+
+ // Convert the thread's CPU-local runtime to a globally agreed scale which
+ // can be understood on any CPU. Use this function in preparing to migrate
+ // the thread to a different CPU; The destination CPU will run
+ // update_after_sleep() which will re-normalize the global measure the
+ // destination CPU's local one.
+ void export_runtime();
+
+ // Update the thread's local runtime after a sleep, when we potentially
+ // missed one or more renormalization steps (which were only done to
+ // runnable threads), or need to convert global runtime to local runtime.
+ void update_after_sleep();
+
+ // Increase thread's local_runtime (and CPU's c) considering the amount of
+ // time that has passed and the thread's current priority (which we assume
+ // was in effect for the whole duration). "time" is in nanoseconds.
+ void ran_for(s64 time, bool renormalize=true);
+
+ // Given a target local_runtime higher than our own, calculate how much
+ // time (in nanoseconds) it would take until ran_for(time) would bring our
+ // own thread to the given target. Returns -1 if the time is too long to
+ // express in s64.
+ s64 time_until(float target_local_runtime) const;
+
+ void set_priority(float priority) {
+ _priority = priority;
+ }
+
+ float priority() const {
+ return _priority;
+ }
+
+ thread_runtime();
+
+private:
+ // The thread's static priority in (0,inf], with lower priority getting
+ // more runtime: If one thread has priority s and a second has s/2, the
+ // second thread will get twice as much runtime than the first.
+ // priority=inf means only run when no other thread wants to run (e.g.,
+ // the idle thread has this priority). The default priority is 1.
+ float _priority;
+ // R'' in the document.
+ float _Rtt;
+ // Renormalization counter, so that we don't need to maintain a linked
+ // list of all threads belonging (not necessarily runnable) to CPU.
+ // If _renormalize_count == -1, it means the runtime is not normalized
+ // (i.e., export_runtime() was called, or this is a new thread).
+ int _renormalize_count;
+};
+
class thread : private timer_base::client {
public:
struct stack_info {
@@ -254,8 +322,6 @@ private:
arch_thread _arch;
arch_fpu _fpu;
unsigned long _id;
- s64 _vruntime;
- static const s64 max_vruntime = std::numeric_limits<s64>::max();
std::function<void ()> _cleanup;
// When _ref_counter reaches 0, the thread can be deleted.
// Starts with 1, decremented by complete() and also temporarily modified
@@ -281,6 +347,7 @@ public:
// for the debugger
bi::list_member_hook<> _thread_list_link;
static unsigned long _s_idgen;
+ thread_runtime _runtime;
private:
class reaper;
friend class reaper;
@@ -311,7 +378,7 @@ private:
class thread_runtime_compare {
public:
bool operator()(const thread& t1, const thread& t2) const {
- return t1._vruntime < t2._vruntime;
+ return t1._runtime.get_local() < t2._runtime.get_local();
}
};
@@ -354,11 +421,13 @@ struct cpu : private timer_base::client {
void load_balance();
unsigned load();
void reschedule_from_interrupt(bool preempt = false);
- void enqueue(thread& t, bool waking = false);
+ void enqueue(thread& t);
void init_idle_thread();
- void update_preemption_timer(thread* current, s64 now, s64 run);
virtual void timer_fired() override;
class notifier;
+ // For new scheduling
+ float c;
+ int renormalize_count;
};
class cpu::notifier {
diff --git a/scripts/loader.py b/scripts/loader.py
index d1d31aa..c7c4729 100644
--- a/scripts/loader.py
+++ b/scripts/loader.py
@@ -400,14 +400,14 @@ class osv_info_threads(gdb.Command):
else:
function = '??'
status = str(t['_status']['_M_i']).replace('sched::thread::', '')
- gdb.write('%4d (0x%x) cpu%s %-10s %s at %s:%s vruntime %12d\n' %
+ gdb.write('%4d (0x%x) cpu%s %-10s %s at %s:%s vruntime %g\n' %
(tid, ulong(t.address),
cpu['arch']['acpi_id'],
status,
function,
fname,
sal.line,
- t['_vruntime'],
+ t['_runtime']['_Rtt'],
)
)
show_thread_timers(t)
@@ -806,7 +806,7 @@ class osv_runqueue(gdb.Command):
for cpu in xrange(ncpus) :
gdb.write("CPU %d:\n" % cpu)
for thread in runqueue(cpu):
- print '%d 0x%x %d' % (thread['_id'], ulong(thread), thread['_vruntime'])
+ print '%d 0x%x %g' % (thread['_id'], ulong(thread), thread['_runtime']['_Rtt'])
osv()
--
1.8.3.1