This adds the set_rate_limit API call for the priority class that
propages the given bytes/sec and ops/sec values down to the queues.
The numbers in question are then recalculated into fair queue tickets.
include/seastar/core/fair_queue.hh | 11 +++++++
include/seastar/core/io_priority_class.hh | 7 ++++
include/seastar/core/io_queue.hh | 2 ++
include/seastar/core/reactor.hh | 2 ++
src/core/io_queue.cc | 39 +++++++++++++++++++++++
src/core/reactor.cc | 6 ++++
6 files changed, 67 insertions(+)
diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 446b0ca1..c2085340 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -167,6 +167,13 @@ class rate_limiter {
_credit -= desc;
}
+ void update(fair_queue_ticket limit, fair_queue_ticket rate) noexcept {
+ _limit = limit;
+ _rate_ms = rate;
+ _credit = limit;
+ _last_refill = std::chrono::steady_clock::now();
+ }
+
bool check_credit(fair_queue_ticket desc) noexcept;
std::chrono::steady_clock::time_point next_refill(fair_queue_ticket desc) const noexcept;
};
@@ -192,6 +199,10 @@ class priority_class {
void update_shares(uint32_t shares) noexcept {
_shares = (std::max(shares, 1u));
}
+
+ void set_rate_limit(fair_queue_ticket limit, fair_queue_ticket rate) noexcept {
+ _rl.update(limit, rate);
+ }
};
/// \endcond
diff --git a/include/seastar/core/io_priority_class.hh b/include/seastar/core/io_priority_class.hh
index e315839e..eeee792b 100644
--- a/include/seastar/core/io_priority_class.hh
+++ b/include/seastar/core/io_priority_class.hh
@@ -58,6 +58,13 @@ class io_priority_class {
/// \return a future that is ready when the share update is applied
future<> update_shares(uint32_t shares) const;
+ /// \brief (Re)Sets the throttling configuration for a given priority class
+ ///
+ /// \param bytes_per_second the throughput value. Zero means "no throttling"
+ /// \param ops_per_second the IOPS value. Zero means "no throttling"
+ /// \return a future that is ready when the throttling is applied
+ future<> set_rate_limit(size_t bytes_per_second, unsigned ops_per_second) const;
+
/// Renames an io priority class
///
/// Renames an `io_priority_class` previously created with register_one_priority_class().
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 39c54390..10c0c9fa 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -99,6 +99,7 @@ class io_queue {
size_t disk_read_saturation_length = std::numeric_limits<size_t>::max();
size_t disk_write_saturation_length = std::numeric_limits<size_t>::max();
sstring mountpoint = "undefined";
+ double rate_limit_period_sec = 0.01; // 10ms
};
io_queue(io_group_ptr group, internal::io_sink& sink);
@@ -138,6 +139,7 @@ class io_queue {
dev_t dev_id() const noexcept;
future<> update_shares_for_class(io_priority_class pc, size_t new_shares);
+ future<> set_rate_limit_for_class(io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second);
void rename_priority_class(io_priority_class pc, sstring new_name);
struct request_limits {
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index a832b0b2..87166a09 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -492,6 +492,8 @@ class reactor {
future<> update_shares_for_class(io_priority_class pc, uint32_t shares);
/// @private
future<> update_shares_for_queues(io_priority_class pc, uint32_t shares);
+ /// @private
+ future<> set_rate_limit_for_queues(io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second);
[[deprecated("Use io_priority_class.rename")]]
static future<> rename_priority_class(io_priority_class pc, sstring new_name) noexcept;
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 78d355da..32ccf007 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -390,6 +390,10 @@ future<> io_priority_class::update_shares(uint32_t shares) const {
return engine().update_shares_for_queues(*this, shares);
}
+future<> io_priority_class::set_rate_limit(uint64_t bytes_per_second, uint32_t ops_per_second) const {
+ return engine().set_rate_limit_for_queues(*this, bytes_per_second, ops_per_second);
+}
+
bool io_priority_class::rename_registered(sstring new_name) {
std::lock_guard<std::mutex> guard(_register_lock);
for (unsigned i = 0; i < _max_classes; ++i) {
@@ -606,6 +610,41 @@ io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares)
});
}
+future<> io_queue::set_rate_limit_for_class(const io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second) {
+ return futurize_invoke([this, pc, bytes_per_second, ops_per_second] {
+ unsigned weight_rate = std::numeric_limits<unsigned>::max();
+ unsigned max_weight = std::numeric_limits<unsigned>::max();
+
+ size_t size_rate = std::numeric_limits<size_t>::max();
+ size_t max_size = std::numeric_limits<size_t>::max();
+
+ if (ops_per_second != 0 && ops_per_second < weight_rate / read_request_base_count) {
+ weight_rate = ops_per_second * read_request_base_count / 1000;
+ if (weight_rate == 0) {
+ throw std::runtime_error("Extremely low IOPS limit");
+ }
+ // At least one request must fit
+ max_weight = std::max<uint32_t>(ops_per_second * read_request_base_count * get_config().rate_limit_period_sec, read_request_base_count);
+ }
+
+ if (bytes_per_second != 0 && bytes_per_second < size_rate / read_request_base_count) {
+ size_rate = bytes_per_second * read_request_base_count / 1000;
+ if (size_rate >> request_ticket_size_shift == 0) {
+ throw std::runtime_error("Extremely low throughput limit");
+ }
+ // The max_bytes_count sized request must fit
+ max_size = std::max<uint32_t>(bytes_per_second * read_request_base_count * get_config().rate_limit_period_sec, get_config().max_bytes_count);
+ }
+
+ fair_queue_ticket limit(max_weight, max_size >> request_ticket_size_shift);
+ fair_queue_ticket rate(weight_rate, size_rate >> request_ticket_size_shift);
+ io_log.debug("Adjust rate limit for {} pc: bps {} ops {} -> limit {} rate {}",
pc.id(), bytes_per_second, ops_per_second, limit, rate);
+
+ auto& pclass = find_or_create_class(pc);
+ pclass.pclass()->set_rate_limit(limit, rate);
+ });
+}
+
void
io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
if (_priority_classes.size() >
pc.id() &&
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 54e3e550..1af02899 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -205,6 +205,12 @@ future<> reactor::update_shares_for_queues(io_priority_class pc, uint32_t shares
});
}
+future<> reactor::set_rate_limit_for_queues(io_priority_class pc, size_t bytes_per_second, unsigned ops_per_second) {
+ return parallel_for_each(_io_queues, [pc, bytes_per_second, ops_per_second] (auto& queue) {
+ return queue.second->set_rate_limit_for_class(pc, bytes_per_second, ops_per_second);
+ });
+}
+
future<> reactor::rename_queues(io_priority_class pc, sstring new_name) noexcept {
return futurize_invoke([this, pc, new_name = std::move(new_name)] {
for (auto&& queue : _io_queues) {
--
2.20.1