[RFC PATCH 0/8] IO throttling

8 views
Skip to first unread message

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:00 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
There's a request to have hard limits for both bandwidth and IOPS.
Another potential benefit from IO throttling is to mitigate the
effect write requests have on read requests. The effect is that
once there happens what we call "mixed workload" disks start to
prefer writes over reads and penalize the latter requests with
extra latency. Rate-limiting writes seems to improve reads up to
some extent.

The rate limiter in this set is the leaky-bucket-based algo that's
wired directly into the fair-queue and, thus, operates on tickets.
The io-queue level converts human readable bytes/sec and ops/sec
into tickets. Sharing is off, rate-limiting is per-shard.

branch: https://github.com/xemul/seastar/tree/br-fq-rate-limiting
refs: #817

Pavel Emelyanov (8):
io_priority_class: Fix .update_shares() documentation
io_queue: Use io_logger everywhere
fair_queue: Add more arithmetics to tickets
fair_queue: Shuffle dispatching loop
fair_queue: Add rate limiter
io_queue: Configure rate-limiter on the fair-queue
io_tester: Split pclass info section in config
io_tester: Configure bps/ops

apps/io_tester/conf.yaml | 6 +-
apps/io_tester/io_tester.cc | 41 ++++++++--
include/seastar/core/fair_queue.hh | 68 +++++++++++++++-
include/seastar/core/io_priority_class.hh | 8 +-
include/seastar/core/io_queue.hh | 2 +
include/seastar/core/reactor.hh | 2 +
src/core/fair_queue.cc | 98 ++++++++++++++++++++---
src/core/io_queue.cc | 45 ++++++++++-
src/core/reactor.cc | 6 ++
9 files changed, 250 insertions(+), 26 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:01 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
There's IO-specific logger already, all the messages that come
from io_queue.cc code deserve to be sent into it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/io_queue.cc | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 2f890de0..78d355da 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -298,7 +298,7 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink)
, _fq(_group->_fg, make_fair_queue_config(_group->_config))
, _sink(sink)
{
- seastar_logger.debug("Created io queue, multipliers {}:{}",
+ io_log.debug("Created io queue, multipliers {}:{}",
get_config().disk_req_write_to_read_multiplier,
get_config().disk_bytes_write_to_read_multiplier);
}
@@ -325,7 +325,7 @@ fair_group::config io_group::make_fair_group_config(io_queue::config qcfg) noexc
* request_fq_ticket() method.
*/
if (max_req_count < max_req_count_min) {
- seastar_logger.warn("The disk request rate is too low, configuring it to {}, but you may experience latency problems", max_req_count_min);
+ io_log.warn("The disk request rate is too low, configuring it to {}, but you may experience latency problems", max_req_count_min);
max_req_count = max_req_count_min;
}
return fair_group::config(max_req_count,
@@ -335,7 +335,7 @@ fair_group::config io_group::make_fair_group_config(io_queue::config qcfg) noexc
io_group::io_group(io_queue::config io_cfg) noexcept
: _fg(make_fair_group_config(io_cfg))
, _config(io_cfg) {
- seastar_logger.debug("Created io group, limits {}:{}", io_cfg.max_req_count, io_cfg.max_bytes_count);
+ io_log.debug("Created io group, limits {}:{}", io_cfg.max_req_count, io_cfg.max_bytes_count);
}

io_queue::~io_queue() {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:02 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
Add the <= comparator which checks that one ticket is strictly
less than the other.

Add the multiplication operator to multiply ticket by a floating
point number. If the result should overflow either of ticket's
dimenstions put the maximum allowed value into it.

Both will be used by rate-limiter code soon.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 8 ++++++++
src/core/fair_queue.cc | 13 +++++++++++++
2 files changed, 21 insertions(+)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 7c415553..a818e83a 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -65,6 +65,14 @@ class fair_queue_ticket {
/// Checks if the tickets fully equals to another one
/// \param desc another \ref fair_queue_ticket to compare with
bool operator==(const fair_queue_ticket& desc) const noexcept;
+ /// Checks if the tickets less-or-equal with all its components than another one
+ /// \param desc another \ref fair_queue_ticket to compare with
+ bool operator<=(const fair_queue_ticket& desc) const noexcept;
+
+ /// Multiplies each component of the ticket by the provided factor
+ /// If the resulting value would overflow it's set to maxumum possible
+ /// \param k the factor value
+ fair_queue_ticket operator*(double k) const noexcept;

std::chrono::microseconds duration_at_pace(float weight_pace, float size_pace) const noexcept;

diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 44c72ded..0290fa25 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -69,6 +69,15 @@ fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) noexcep
return *this;
}

+fair_queue_ticket fair_queue_ticket::operator*(double k) const noexcept {
+ auto multiply = [] (uint32_t a, double b) noexcept -> uint32_t {
+ return (b < 1.0 || a < std::numeric_limits<uint32_t>::max() / b) ?
+ a * b : std::numeric_limits<uint32_t>::max();
+ };
+
+ return fair_queue_ticket(multiply(_weight, k), multiply(_size, k));
+}
+
fair_queue_ticket::operator bool() const noexcept {
return (_weight > 0) || (_size > 0);
}
@@ -77,6 +86,10 @@ bool fair_queue_ticket::operator==(const fair_queue_ticket& o) const noexcept {
return _weight == o._weight && _size == o._size;
}

+bool fair_queue_ticket::operator<=(const fair_queue_ticket& o) const noexcept {
+ return _weight <= o._weight && _size <= o._size;
+}
+
std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) {
return os << t._weight << ":" << t._size;
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:03 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
Right now it's a loop in another loop, the inner one may withdraw
a prio class queue from fair queue, the outer does dispatching.
There are two reasons for turning it into a single loop.

There soon will appear another reason for the prio class withdrawal
and current code doesn't allow this to happen easily.

Other than that, there's another branch that obsoletes some stats
counters from fair-queue and for that reworks this place not to take
stats into account when making the dispatching decision.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 1 -
src/core/fair_queue.cc | 17 ++++-------------
2 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index a818e83a..c508761d 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -283,7 +283,6 @@ class fair_queue {

std::optional<pending> _pending;

- priority_class_ptr peek_priority_class();
void push_priority_class(priority_class_ptr pc);
void pop_priority_class(priority_class_ptr pc);

diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 0290fa25..ea17f6f8 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -154,11 +154,6 @@ void fair_queue::push_priority_class(priority_class_ptr pc) {
}
}

-priority_class_ptr fair_queue::peek_priority_class() {
- assert(!_handles.empty());
- return _handles.top();
-}
-
void fair_queue::pop_priority_class(priority_class_ptr pc) {
assert(pc->_queued);
pc->_queued = false;
@@ -268,15 +263,11 @@ void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
}

void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
- while (_requests_queued) {
- priority_class_ptr h;
-
- while (true) {
- h = peek_priority_class();
- if (!h->_queue.empty()) {
- break;
- }
+ while (!_handles.empty()) {
+ priority_class_ptr h = _handles.top();
+ if (h->_queue.empty()) {
pop_priority_class(h);
+ continue;
}

auto& req = h->_queue.front();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:04 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
Based on a leaky bucket approach. Each priority class is equipped with
a limiter that carries a credit on board. It tries to consume from the
credit every time it wants to dispatch a requ^w entry. When the credit
is exhausted the priority class is "delayed" i.e. moved into a delayed
list. Classes (and rate limiters owned by them) are refilled from time
to time at the configured rate. When the accumulated credit exceeds a
certain limit the accumulation stops.

All the aforementioned credit, limit and rate are fair queue tickets.
Next patch will introduce conversion from human readable bytes/sec and
ops/sec into tickets.

The "from time to time" refil can happen in two places. First is when
the prio calss is on the delayed list. This list gets refilled before
fair-queue is polled for dispatch. Second, the credit can be refilled
at the time of dispatch on not-yet-delayed class.

Also the io-queue enter to the interrup mode takes into account delayed
classes to wake up the reactor at the time they will have chance to get
refilled.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 48 ++++++++++++++++++++
src/core/fair_queue.cc | 72 ++++++++++++++++++++++++++++++
2 files changed, 120 insertions(+)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index c508761d..446b0ca1 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -26,6 +26,7 @@
#include <seastar/core/circular_buffer.hh>
#include <atomic>
#include <queue>
+#include <list>
#include <chrono>
#include <unordered_set>
#include <optional>
@@ -75,6 +76,7 @@ class fair_queue_ticket {
fair_queue_ticket operator*(double k) const noexcept;

std::chrono::microseconds duration_at_pace(float weight_pace, float size_pace) const noexcept;
+ std::chrono::milliseconds duration_at_rate(fair_queue_ticket rate) const noexcept;

/// \returns true if the fair_queue_ticket represents a non-zero quantity.
///
@@ -138,6 +140,37 @@ class fair_queue_entry {
fair_queue_ticket ticket() const noexcept { return _ticket; }
};

+/// \cond internal
+// A leaky-bucket-based rate-limiter.
+class rate_limiter {
+ // The amount of tickets that are allowed to be consumed
+ // Call check_credit(ticket), then do whatever you want, then call
+ // consume(ticket) if the former returned true
+ fair_queue_ticket _credit;
+
+ // Maximum value for the _credit when being refilled
+ fair_queue_ticket _limit;
+ // The amount of tickets to return back to _credit per ms
+ fair_queue_ticket _rate_ms;
+ // Last time the _credit was replenished
+ std::chrono::steady_clock::time_point _last_refill;
+
+public:
+ rate_limiter() noexcept
+ : _credit(fair_queue_ticket(std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max()))
+ , _limit(fair_queue_ticket(std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max()))
+ , _rate_ms(fair_queue_ticket(std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max()))
+ , _last_refill(std::chrono::steady_clock::now())
+ {}
+
+ void consume(fair_queue_ticket desc) noexcept {
+ _credit -= desc;
+ }
+
+ bool check_credit(fair_queue_ticket desc) noexcept;
+ std::chrono::steady_clock::time_point next_refill(fair_queue_ticket desc) const noexcept;
+};
+
/// \cond internal
class priority_class {
friend class fair_queue;
@@ -145,6 +178,7 @@ class priority_class {
float _accumulated = 0;
fair_queue_entry::container_list_t _queue;
bool _queued = false;
+ rate_limiter _rl;

friend struct shared_ptr_no_esft<priority_class>;
explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
@@ -261,6 +295,8 @@ class fair_queue {
clock_type _base;
using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
prioq _handles;
+ using delayq = std::list<priority_class_ptr>;
+ delayq _delayed;
std::unordered_set<priority_class_ptr> _all_classes;

/*
@@ -285,10 +321,12 @@ class fair_queue {

void push_priority_class(priority_class_ptr pc);
void pop_priority_class(priority_class_ptr pc);
+ void delay_priority_class(priority_class_ptr pc);

float normalize_factor() const;

void normalize_stats();
+ void refill_delayed();

// Estimated time to process the given ticket
std::chrono::microseconds duration(fair_queue_ticket desc) const noexcept {
@@ -357,6 +395,16 @@ class fair_queue {
fair_queue_ticket over = pending_head.maybe_ahead_of(_group.head());
return std::chrono::steady_clock::now() + duration(over);
}
+ if (!_delayed.empty()) {
+ clock_type next = std::chrono::steady_clock::time_point::max();
+ for (const auto& pc : _delayed) {
+ auto pc_next = pc->_rl.next_refill(pc->_queue.front()._ticket);
+ if (pc_next < next) {
+ next = pc_next;
+ }
+ }
+ return next;
+ }

return std::chrono::steady_clock::time_point::max();
}
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index ea17f6f8..f20bbbb6 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -154,6 +154,12 @@ void fair_queue::push_priority_class(priority_class_ptr pc) {
}
}

+void fair_queue::delay_priority_class(priority_class_ptr pc) {
+ assert(pc->_queued);
+ _handles.pop();
+ _delayed.push_back(pc);
+}
+
void fair_queue::pop_priority_class(priority_class_ptr pc) {
assert(pc->_queued);
pc->_queued = false;
@@ -178,6 +184,14 @@ std::chrono::microseconds fair_queue_ticket::duration_at_pace(float weight_pace,
return std::chrono::microseconds(dur);
}

+std::chrono::milliseconds fair_queue_ticket::duration_at_rate(fair_queue_ticket rate_ms) const noexcept {
+ auto div_round_up = [] (uint32_t num, uint32_t den) noexcept -> uint32_t {
+ return num + den - 1 / den;
+ };
+
+ return std::chrono::milliseconds(std::max(div_round_up(_weight, rate_ms._weight), div_round_up(_size, rate_ms._size)));
+}
+
bool fair_queue::grab_pending_capacity(fair_queue_ticket cap) noexcept {
fair_group_rover pending_head = _pending->orig_tail + cap;
if (pending_head.maybe_ahead_of(_group.head())) {
@@ -262,7 +276,59 @@ void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
ent._ticket = fair_queue_ticket();
}

+std::chrono::steady_clock::time_point rate_limiter::next_refill(fair_queue_ticket desc) const noexcept {
+ return std::chrono::steady_clock::now() +
+ (_credit <= desc ? (desc - _credit).duration_at_rate(_rate_ms) : std::chrono::milliseconds(0));
+}
+
+bool rate_limiter::check_credit(fair_queue_ticket t) noexcept {
+ if (t <= _credit) {
+ return true;
+ }
+
+ auto now = std::chrono::steady_clock::now();
+ auto delta = _rate_ms * std::chrono::duration_cast<std::chrono::duration<double>>(now - _last_refill).count() * 1000.0;
+ if (_limit - _credit <= delta) {
+ // Too much time passed, the accumulated credit exceeds the limit.
+ _credit = _limit;
+ _last_refill = now;
+ return true;
+ }
+
+ if (t <= _credit + delta) {
+ _credit += delta;
+ // NB: This is not 100% accurate. Rounding errors result in fractions
+ // of ticket being lost and this accumulates over time. We can account
+ // for that by increasing the _last_refill on the 'delta / _rate_ms'
+ // value, but the trick is that ticket value is 2d and will result in
+ // different corrections for each component.
+ _last_refill = now;
+ return true;
+ }
+
+ return false;
+}
+
+void fair_queue::refill_delayed() {
+ auto i = _delayed.begin();
+
+ while (i != _delayed.end()) {
+ auto& pc = *i;
+
+ auto& req = pc->_queue.front();
+ if (pc->_rl.check_credit(req._ticket)) {
+ i = _delayed.erase(i);
+ _handles.push(pc);
+ assert(pc->_queued);
+ } else {
+ i++;
+ }
+ }
+}
+
void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
+ refill_delayed();
+
while (!_handles.empty()) {
priority_class_ptr h = _handles.top();
if (h->_queue.empty()) {
@@ -271,6 +337,11 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}

auto& req = h->_queue.front();
+ if (!h->_rl.check_credit(req._ticket)) {
+ delay_priority_class(h);
+ continue;
+ }
+
if (!grab_capacity(req._ticket)) {
break;
}
@@ -278,6 +349,7 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
pop_priority_class(h);
h->_queue.pop_front();

+ h->_rl.consume(req._ticket);
_resources_executing += req._ticket;
_resources_queued -= req._ticket;
_requests_executing++;
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:05 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
Right now the priority class configuration comes together with
the request generation parameters. This is inconvenient, keep
the class config in separate section.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
apps/io_tester/conf.yaml | 6 ++++--
apps/io_tester/io_tester.cc | 29 ++++++++++++++++++++++-------
2 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/apps/io_tester/conf.yaml b/apps/io_tester/conf.yaml
index bfc8c419..8942cf68 100644
--- a/apps/io_tester/conf.yaml
+++ b/apps/io_tester/conf.yaml
@@ -4,8 +4,9 @@
shard_info:
parallelism: 10
reqsize: 256kB
- shares: 10
think_time: 0
+ pclass_info:
+ shares: 10

- name: latency_reads
shards: [0]
@@ -14,8 +15,9 @@
shard_info:
parallelism: 1
reqsize: 512
- shares: 100
think_time: 1000us
+ pclass_info:
+ shares: 100

- name: cpu_hog
shards: [0]
diff --git a/apps/io_tester/io_tester.cc b/apps/io_tester/io_tester.cc
index 0580ec0d..d91f57da 100644
--- a/apps/io_tester/io_tester.cc
+++ b/apps/io_tester/io_tester.cc
@@ -92,10 +92,13 @@ class shard_config {
}
};

+struct pclass_info {
+ unsigned shares = 10;
+};
+
struct shard_info {
unsigned parallelism = 0;
unsigned rps = 0;
- unsigned shares = 10;
uint64_t request_size = 4 << 10;
std::chrono::duration<float> think_time = 0ms;
std::chrono::duration<float> execution_time = 1ms;
@@ -113,6 +116,7 @@ struct job_config {
request_type type;
shard_config shard_placement;
::shard_info shard_info;
+ ::pclass_info pclass_info;
::options options;
// size of each individual file. Every class and every shard have its file, so in a normal
// system with many shards we'll naturally have many files and that will push the data out
@@ -152,7 +156,7 @@ class class_data {
class_data(job_config cfg)
: _config(std::move(cfg))
, _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512)
- , _iop(io_priority_class::register_one(name(), _config.shard_info.shares))
+ , _iop(io_priority_class::register_one(name(), _config.pclass_info.shares))
, _sg(cfg.shard_info.scheduling_group)
, _latencies(extended_p_square_probabilities = quantiles)
, _pos_distribution(0, _config.file_size / _config.shard_info.request_size)
@@ -286,7 +290,7 @@ class class_data {
}

unsigned shares() const {
- return _config.shard_info.shares;
+ return _config.pclass_info.shares;
}

std::chrono::duration<float> total_duration() const {
@@ -617,9 +621,6 @@ struct convert<shard_info> {
return false;
}

- if (node["shares"]) {
- sl.shares = node["shares"].as<unsigned>();
- }
if (node["reqsize"]) {
sl.request_size = node["reqsize"].as<byte_size>().size;
}
@@ -633,6 +634,17 @@ struct convert<shard_info> {
}
};

+template<>
+struct convert<pclass_info> {
+ static bool decode(const Node& node, pclass_info& pl) {
+ if (node["shares"]) {
+ pl.shares = node["shares"].as<unsigned>();
+ }
+ return true;
+ }
+};
+
+
template<>
struct convert<options> {
static bool decode(const Node& node, options& op) {
@@ -660,6 +672,9 @@ struct convert<job_config> {
if (node["shard_info"]) {
cl.shard_info = node["shard_info"].as<shard_info>();
}
+ if (node["pclass_info"]) {
+ cl.pclass_info = node["pclass_info"].as<pclass_info>();
+ }
if (node["options"]) {
cl.options = node["options"].as<options>();
}
@@ -778,7 +793,7 @@ int main(int ac, char** av) {
auto reqs = doc.as<std::vector<job_config>>();

parallel_for_each(reqs, [] (auto& r) {
- return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) {
+ return seastar::create_scheduling_group(r.name, r.pclass_info.shares).then([&r] (seastar::scheduling_group sg) {
r.shard_info.scheduling_group = sg;
});
}).get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:05 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
This adds the set_rate_limit API call for the priority class that
propages the given bytes/sec and ops/sec values down to the queues.
The numbers in question are then recalculated into fair queue tickets.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 11 +++++++
include/seastar/core/io_priority_class.hh | 7 ++++
include/seastar/core/io_queue.hh | 2 ++
include/seastar/core/reactor.hh | 2 ++
src/core/io_queue.cc | 39 +++++++++++++++++++++++
src/core/reactor.cc | 6 ++++
6 files changed, 67 insertions(+)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 446b0ca1..c2085340 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -167,6 +167,13 @@ class rate_limiter {
_credit -= desc;
}

+ void update(fair_queue_ticket limit, fair_queue_ticket rate) noexcept {
+ _limit = limit;
+ _rate_ms = rate;
+ _credit = limit;
+ _last_refill = std::chrono::steady_clock::now();
+ }
+
bool check_credit(fair_queue_ticket desc) noexcept;
std::chrono::steady_clock::time_point next_refill(fair_queue_ticket desc) const noexcept;
};
@@ -192,6 +199,10 @@ class priority_class {
void update_shares(uint32_t shares) noexcept {
_shares = (std::max(shares, 1u));
}
+
+ void set_rate_limit(fair_queue_ticket limit, fair_queue_ticket rate) noexcept {
+ _rl.update(limit, rate);
+ }
};
/// \endcond

diff --git a/include/seastar/core/io_priority_class.hh b/include/seastar/core/io_priority_class.hh
index e315839e..eeee792b 100644
--- a/include/seastar/core/io_priority_class.hh
+++ b/include/seastar/core/io_priority_class.hh
@@ -58,6 +58,13 @@ class io_priority_class {
/// \return a future that is ready when the share update is applied
future<> update_shares(uint32_t shares) const;

+ /// \brief (Re)Sets the throttling configuration for a given priority class
+ ///
+ /// \param bytes_per_second the throughput value. Zero means "no throttling"
+ /// \param ops_per_second the IOPS value. Zero means "no throttling"
+ /// \return a future that is ready when the throttling is applied
+ future<> set_rate_limit(size_t bytes_per_second, unsigned ops_per_second) const;
+
/// Renames an io priority class
///
/// Renames an `io_priority_class` previously created with register_one_priority_class().
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 39c54390..10c0c9fa 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -99,6 +99,7 @@ class io_queue {
size_t disk_read_saturation_length = std::numeric_limits<size_t>::max();
size_t disk_write_saturation_length = std::numeric_limits<size_t>::max();
sstring mountpoint = "undefined";
+ double rate_limit_period_sec = 0.01; // 10ms
};

io_queue(io_group_ptr group, internal::io_sink& sink);
@@ -138,6 +139,7 @@ class io_queue {
dev_t dev_id() const noexcept;

future<> update_shares_for_class(io_priority_class pc, size_t new_shares);
+ future<> set_rate_limit_for_class(io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second);
void rename_priority_class(io_priority_class pc, sstring new_name);

struct request_limits {
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index a832b0b2..87166a09 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -492,6 +492,8 @@ class reactor {
future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
/// @private
future<> update_shares_for_queues(io_priority_class pc, uint32_t shares);
+ /// @private
+ future<> set_rate_limit_for_queues(io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second);

[[deprecated("Use io_priority_class.rename")]]
static future<> rename_priority_class(io_priority_class pc, sstring new_name) noexcept;
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 78d355da..32ccf007 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -390,6 +390,10 @@ future<> io_priority_class::update_shares(uint32_t shares) const {
return engine().update_shares_for_queues(*this, shares);
}

+future<> io_priority_class::set_rate_limit(uint64_t bytes_per_second, uint32_t ops_per_second) const {
+ return engine().set_rate_limit_for_queues(*this, bytes_per_second, ops_per_second);
+}
+
bool io_priority_class::rename_registered(sstring new_name) {
std::lock_guard<std::mutex> guard(_register_lock);
for (unsigned i = 0; i < _max_classes; ++i) {
@@ -606,6 +610,41 @@ io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares)
});
}

+future<> io_queue::set_rate_limit_for_class(const io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second) {
+ return futurize_invoke([this, pc, bytes_per_second, ops_per_second] {
+ unsigned weight_rate = std::numeric_limits<unsigned>::max();
+ unsigned max_weight = std::numeric_limits<unsigned>::max();
+
+ size_t size_rate = std::numeric_limits<size_t>::max();
+ size_t max_size = std::numeric_limits<size_t>::max();
+
+ if (ops_per_second != 0 && ops_per_second < weight_rate / read_request_base_count) {
+ weight_rate = ops_per_second * read_request_base_count / 1000;
+ if (weight_rate == 0) {
+ throw std::runtime_error("Extremely low IOPS limit");
+ }
+ // At least one request must fit
+ max_weight = std::max<uint32_t>(ops_per_second * read_request_base_count * get_config().rate_limit_period_sec, read_request_base_count);
+ }
+
+ if (bytes_per_second != 0 && bytes_per_second < size_rate / read_request_base_count) {
+ size_rate = bytes_per_second * read_request_base_count / 1000;
+ if (size_rate >> request_ticket_size_shift == 0) {
+ throw std::runtime_error("Extremely low throughput limit");
+ }
+ // The max_bytes_count sized request must fit
+ max_size = std::max<uint32_t>(bytes_per_second * read_request_base_count * get_config().rate_limit_period_sec, get_config().max_bytes_count);
+ }
+
+ fair_queue_ticket limit(max_weight, max_size >> request_ticket_size_shift);
+ fair_queue_ticket rate(weight_rate, size_rate >> request_ticket_size_shift);
+ io_log.debug("Adjust rate limit for {} pc: bps {} ops {} -> limit {} rate {}", pc.id(), bytes_per_second, ops_per_second, limit, rate);
+
+ auto& pclass = find_or_create_class(pc);
+ pclass.pclass()->set_rate_limit(limit, rate);
+ });
+}
+
void
io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
if (_priority_classes.size() > pc.id() &&
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 54e3e550..1af02899 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -205,6 +205,12 @@ future<> reactor::update_shares_for_queues(io_priority_class pc, uint32_t shares
});
}

+future<> reactor::set_rate_limit_for_queues(io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second) {
+ return parallel_for_each(_io_queues, [pc, bytes_per_second, ops_per_second] (auto& queue) {
+ return queue.second->set_rate_limit_for_class(pc, bytes_per_second, ops_per_second);
+ });
+}
+
future<> reactor::rename_queues(io_priority_class pc, sstring new_name) noexcept {
return futurize_invoke([this, pc, new_name = std::move(new_name)] {
for (auto&& queue : _io_queues) {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 8:26:06 AM10/13/21
to seastar-dev@googlegroups.com, Pavel Emelyanov
Add the bytes/sec and ops/sec tunables and configure the rate limiter
before starting the parallelized workload.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
apps/io_tester/io_tester.cc | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/apps/io_tester/io_tester.cc b/apps/io_tester/io_tester.cc
index d91f57da..7d0ccda3 100644
--- a/apps/io_tester/io_tester.cc
+++ b/apps/io_tester/io_tester.cc
@@ -94,6 +94,8 @@ class shard_config {

struct pclass_info {
unsigned shares = 10;
+ size_t bytes_per_second = 0;
+ unsigned ops_per_second = 0;
};

struct shard_info {
@@ -211,7 +213,9 @@ class class_data {
_start = std::chrono::steady_clock::now();
return with_scheduling_group(_sg, [this, stop] {
if (parallelism() != 0) {
- return issue_requests_in_parallel(stop, parallelism());
+ return _iop.set_rate_limit(_config.pclass_info.bytes_per_second, _config.pclass_info.ops_per_second).then([this, stop] {
+ return issue_requests_in_parallel(stop, parallelism());
+ });
} else /* rps() != 0 */ {
assert(rps() != 0);
return issue_requests_at_rate(stop, rps());
@@ -640,6 +644,12 @@ struct convert<pclass_info> {
if (node["shares"]) {
pl.shares = node["shares"].as<unsigned>();
}
+ if (node["bytes_per_second"]) {
+ pl.bytes_per_second = node["bytes_per_second"].as<byte_size>().size;
+ }
+ if (node["ops_per_second"]) {
+ pl.ops_per_second = node["ops_per_second"].as<unsigned>();
+ }
return true;
}
};
--
2.20.1

Dor Laor

<dor@scylladb.com>
unread,
Oct 13, 2021, 10:35:23 AM10/13/21
to Pavel Emelyanov, seastar-dev
On Wed, Oct 13, 2021 at 3:26 PM Pavel Emelyanov <xe...@scylladb.com> wrote:
There's a request to have hard limits for both bandwidth and IOPS.
Another potential benefit from IO throttling is to mitigate the
effect write requests have on read requests. The effect is that
once there happens what we call "mixed workload" disks start to
prefer writes over reads and penalize the latter requests with
extra latency. Rate-limiting writes seems to improve reads up to
some extent.

The rate limiter in this set is the leaky-bucket-based algo that's
wired directly into the fair-queue and, thus, operates on tickets.
The io-queue level converts human readable bytes/sec and ops/sec
into tickets. Sharing is off, rate-limiting is per-shard.

Pavel, do you have results to share or it's still in RFC mode?
 

branch: https://github.com/xemul/seastar/tree/br-fq-rate-limiting
refs: #817

Pavel Emelyanov (8):
  io_priority_class: Fix .update_shares() documentation
  io_queue: Use io_logger everywhere
  fair_queue: Add more arithmetics to tickets
  fair_queue: Shuffle dispatching loop
  fair_queue: Add rate limiter
  io_queue: Configure rate-limiter on the fair-queue
  io_tester: Split pclass info section in config
  io_tester: Configure bps/ops

 apps/io_tester/conf.yaml                  |  6 +-
 apps/io_tester/io_tester.cc               | 41 ++++++++--
 include/seastar/core/fair_queue.hh        | 68 +++++++++++++++-
 include/seastar/core/io_priority_class.hh |  8 +-
 include/seastar/core/io_queue.hh          |  2 +
 include/seastar/core/reactor.hh           |  2 +
 src/core/fair_queue.cc                    | 98 ++++++++++++++++++++---
 src/core/io_queue.cc                      | 45 ++++++++++-
 src/core/reactor.cc                       |  6 ++
 9 files changed, 250 insertions(+), 26 deletions(-)

--
2.20.1

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20211013122552.18925-1-xemul%40scylladb.com.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 13, 2021, 2:13:35 PM10/13/21
to Dor Laor, seastar-dev


On 13.10.2021 17:34, Dor Laor wrote:
> On Wed, Oct 13, 2021 at 3:26 PM Pavel Emelyanov <xe...@scylladb.com <mailto:xe...@scylladb.com>> wrote:
>
> There's a request to have hard limits for both bandwidth and IOPS.
> Another potential benefit from IO throttling is to mitigate the
> effect write requests have on read requests. The effect is that
> once there happens what we call "mixed workload" disks start to
> prefer writes over reads and penalize the latter requests with
> extra latency. Rate-limiting writes seems to improve reads up to
> some extent.
>
> The rate limiter in this set is the leaky-bucket-based algo that's
> wired directly into the fair-queue and, thus, operates on tickets.
> The io-queue level converts human readable bytes/sec and ops/sec
> into tickets. Sharing is off, rate-limiting is per-shard.
>
>
> Pavel, do you have results to share or it's still in RFC mode?

It's still RFC. I'm going to check stress test with this patch this/next
week and see if it helps.

> branch: https://github.com/xemul/seastar/tree/br-fq-rate-limiting <https://github.com/xemul/seastar/tree/br-fq-rate-limiting>
> refs: #817
>
> Pavel Emelyanov (8):
>   io_priority_class: Fix .update_shares() documentation
>   io_queue: Use io_logger everywhere
>   fair_queue: Add more arithmetics to tickets
>   fair_queue: Shuffle dispatching loop
>   fair_queue: Add rate limiter
>   io_queue: Configure rate-limiter on the fair-queue
>   io_tester: Split pclass info section in config
>   io_tester: Configure bps/ops
>
>  apps/io_tester/conf.yaml                  |  6 +-
>  apps/io_tester/io_tester.cc               | 41 ++++++++--
>  include/seastar/core/fair_queue.hh        | 68 +++++++++++++++-
>  include/seastar/core/io_priority_class.hh |  8 +-
>  include/seastar/core/io_queue.hh          |  2 +
>  include/seastar/core/reactor.hh           |  2 +
>  src/core/fair_queue.cc                    | 98 ++++++++++++++++++++---
>  src/core/io_queue.cc                      | 45 ++++++++++-
>  src/core/reactor.cc                       |  6 ++
>  9 files changed, 250 insertions(+), 26 deletions(-)
>
> --
> 2.20.1
>
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com <mailto:seastar-dev%2Bunsu...@googlegroups.com>.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20211013122552.18925-1-xemul%40scylladb.com <https://groups.google.com/d/msgid/seastar-dev/20211013122552.18925-1-xemul%40scylladb.com>.
>
Reply all
Reply to author
Forward
0 new messages