[PATCH 0/9] io_queue: Fixes and reworks for shared fair-queue

2 views
Skip to first unread message

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:10 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The set includes preparations and two fixes for accounting found while
making shared fair-queue prototype and preparing tests for what it can
improve.

Branch: https://github.com/xemul/seastar/tree/br-io-queue-fixlets-2
Tests: unit(dev), io_tester

Pavel Emelyanov (9):
fair_queue: Remove one level of indirection when updating class shares
io_queue: Turn _priority_classes into unique_ptr-s
io_queue: Localize fq_ticket evaluation for request in io_queue method
reactor: Move fsync_io_queue declaration out of a try-catch block
reactor: Make fsync_io_desc look the same as io_rw_desc
reactor: Move all io_request completion processing into submit_io
io_queue: Fix discrepancy of queuer reqs vs their number
fair_queue: Fix _requests_executing accounting
io_queue: Remove process_completions

include/seastar/core/fair_queue.hh | 15 ++---
include/seastar/core/internal/io_desc.hh | 1 +
include/seastar/core/io_queue.hh | 9 +--
include/seastar/core/reactor.hh | 4 +-
src/core/fair_queue.cc | 10 +--
src/core/io_queue.cc | 83 +++++++++++-------------
src/core/reactor.cc | 67 ++++++++++---------
tests/perf/fair_queue_perf.cc | 4 +-
tests/unit/fair_queue_test.cc | 2 +-
9 files changed, 96 insertions(+), 99 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:11 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Just to avoid one more doing nothing helper method on fair_queue

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 13 ++++---------
src/core/fair_queue.cc | 4 ----
src/core/io_queue.cc | 2 +-
tests/unit/fair_queue_test.cc | 2 +-
4 files changed, 6 insertions(+), 15 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 6d9d30da..5f8889e1 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -105,15 +105,15 @@ class priority_class {

friend struct shared_ptr_no_esft<priority_class>;
explicit priority_class(uint32_t shares) : _shares(std::max(shares, 1u)) {}
-
- void update_shares(uint32_t shares) {
- _shares = (std::max(shares, 1u));
- }
public:
/// \brief return the current amount of shares for this priority class
uint32_t shares() const {
return _shares;
}
+
+ void update_shares(uint32_t shares) {
+ _shares = (std::max(shares, 1u));
+ }
};
/// \endcond

@@ -240,11 +240,6 @@ class fair_queue {

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
-
- /// Updates the current shares of this priority class
- ///
- /// \param new_shares the new number of shares for this priority class
- static void update_shares(priority_class_ptr pc, uint32_t new_shares);
};
/// @}

diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 45cf9425..24fa1bb7 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -192,8 +192,4 @@ void fair_queue::dispatch_requests() {
}
}

-void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) {
- pc->update_shares(new_shares);
-}
-
}
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 27f77145..578a138b 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -309,7 +309,7 @@ future<>
io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] {
auto& pclass = find_or_create_class(pc, owner);
- _fq.update_shares(pclass.ptr, new_shares);
+ pclass.ptr->update_shares(new_shares);
});
}

diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc
index 7bd0b209..7b1c23a4 100644
--- a/tests/unit/fair_queue_test.cc
+++ b/tests/unit/fair_queue_test.cc
@@ -119,7 +119,7 @@ class test_env {

void update_shares(unsigned index, uint32_t shares) {
auto cl = _classes[index];
- _fq.update_shares(cl, shares);
+ cl->update_shares(shares);
}

void reset_results(unsigned index) {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:13 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
They used to be shared for real in the past, but nowadays these are shard-local
objects that are used via references, so shared ptr is excessive here.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/io_queue.hh | 3 +--
src/core/io_queue.cc | 4 ++--
2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index e4f8cc8f..81384865 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -24,7 +24,6 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/fair_queue.hh>
#include <seastar/core/metrics_registration.hh>
-#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/internal/io_request.hh>
#include <mutex>
@@ -75,7 +74,7 @@ class io_queue {
void register_stats(sstring name, sstring mountpoint, shard_id owner);
};

- std::vector<std::vector<lw_shared_ptr<priority_class_data>>> _priority_classes;
+ std::vector<std::vector<std::unique_ptr<priority_class_data>>> _priority_classes;
fair_queue _fq;

static constexpr unsigned _max_classes = 2048;
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 578a138b..7a7fb741 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -256,9 +256,9 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
// This conveys all the information we need and allows one to easily group all classes from
// the same I/O queue (by filtering by shard)
auto pc_ptr = _fq.register_priority_class(shares);
- auto pc_data = make_lw_shared<priority_class_data>(name, mountpoint(), pc_ptr, owner);
+ auto pc_data = std::make_unique<priority_class_data>(name, mountpoint(), pc_ptr, owner);

- _priority_classes[owner][id] = pc_data;
+ _priority_classes[owner][id] = std::move(pc_data);
}
return *_priority_classes[owner][id];
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:14 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
This makes it obvious and relaxes the io_desc_read_write interface.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/io_queue.hh | 2 ++
src/core/io_queue.cc | 39 ++++++++++++++++----------------
2 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 81384865..3023811f 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -90,6 +90,8 @@ class io_queue {
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
fair_queue_ticket _completed_accumulator = { 0, 0 };

+ fair_queue_ticket request_fq_ticket(const internal::io_request& req, size_t len);
+
// The fields below are going away, they are just here so we can implement deprecated
// functions that used to be provided by the fair_queue and are going away (from both
// the fair_queue and the io_queue). Double-accounting for now will allow for easier
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 7a7fb741..028a2cbc 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -48,15 +48,11 @@ class io_desc_read_write final : public kernel_completion {
_ioq_ptr->notify_requests_finished(_fq_ticket);
}
public:
- io_desc_read_write(io_queue* ioq, unsigned weight, unsigned size)
+ io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
: _ioq_ptr(ioq)
- , _fq_ticket(fair_queue_ticket{weight, size})
+ , _fq_ticket(ticket)
{}

- fair_queue_ticket& fq_ticket() {
- return _fq_ticket;
- }
-
void set_exception(std::exception_ptr eptr) {
notify_requests_finished();
_pr.set_exception(eptr);
@@ -263,6 +259,22 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
return *_priority_classes[owner][id];
}

+fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) {
+ unsigned weight;
+ size_t size;
+ if (req.is_write()) {
+ weight = _config.disk_req_write_to_read_multiplier;
+ size = _config.disk_bytes_write_to_read_multiplier * len;
+ } else if (req.is_read()) {
+ weight = io_queue::read_request_base_count;
+ size = io_queue::read_request_base_count * len;
+ } else {
+ throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
+ }
+
+ return fair_queue_ticket(weight, size);
+}
+
future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
@@ -273,19 +285,8 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
pclass.nr_queued++;
- unsigned weight;
- size_t size;
- if (req.is_write()) {
- weight = _config.disk_req_write_to_read_multiplier;
- size = _config.disk_bytes_write_to_read_multiplier * len;
- } else if (req.is_read()) {
- weight = io_queue::read_request_base_count;
- size = io_queue::read_request_base_count * len;
- } else {
- throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
- }
- auto desc = std::make_unique<io_desc_read_write>(this, weight, size);
- auto fq_ticket = desc->fq_ticket();
+ fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
+ auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
io_desc_read_write* desc = d.release();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:15 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
No functional changes, just to facilitate next patch.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/reactor.cc | 34 +++++++++++++++++-----------------
1 file changed, 17 insertions(+), 17 deletions(-)

diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 832a58c8..1b15a16b 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1880,26 +1880,26 @@ reactor::fdatasync(int fd) noexcept {
return make_ready_future<>();
}
if (_have_aio_fsync) {
- try {
- // Does not go through the I/O queue, but has to be deleted
- struct fsync_io_desc final : public kernel_completion {
- promise<> _pr;
- public:
- virtual void complete_with(ssize_t res) {
- try {
- engine().handle_io_result(res);
- _pr.set_value();
- } catch (...) {
- _pr.set_exception(std::current_exception());
- }
- delete this;
+ // Does not go through the I/O queue, but has to be deleted
+ struct fsync_io_desc final : public kernel_completion {
+ promise<> _pr;
+ public:
+ virtual void complete_with(ssize_t res) {
+ try {
+ engine().handle_io_result(res);
+ _pr.set_value();
+ } catch (...) {
+ _pr.set_exception(std::current_exception());
}
+ delete this;
+ }

- future<> get_future() {
- return _pr.get_future();
- }
- };
+ future<> get_future() {
+ return _pr.get_future();
+ }
+ };

+ try {
auto desc = std::make_unique<fsync_io_desc>();
auto fut = desc->get_future();

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:17 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The goal is to make reactor::submit_io do all the request completion
processing (and mark it noexcept). To make further patching simpler
patch the fsync_io_desc's submission code to look like the
io_desc_read_write's submission one.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/reactor.cc | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 1b15a16b..65655c53 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1888,27 +1888,31 @@ reactor::fdatasync(int fd) noexcept {
try {
engine().handle_io_result(res);
_pr.set_value();
+ delete this;
} catch (...) {
- _pr.set_exception(std::current_exception());
+ set_exception(std::current_exception());
}
- delete this;
}

future<> get_future() {
return _pr.get_future();
}
+
+ void set_exception(std::exception_ptr eptr) {
+ _pr.set_exception(eptr);
+ delete this;
+ }
};

+ auto desc = new fsync_io_desc();
+ auto fut = desc->get_future();
+ auto req = io_request::make_fdatasync(fd);
try {
- auto desc = std::make_unique<fsync_io_desc>();
- auto fut = desc->get_future();
-
- auto req = io_request::make_fdatasync(fd);
- submit_io(desc.release(), std::move(req));
- return fut;
+ submit_io(desc, std::move(req));
} catch (...) {
- return make_exception_future<>(std::current_exception());
+ desc->set_exception(std::current_exception());
}
+ return fut;
}
return _thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::fdatasync(fd));
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:19 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Now all callers of reactor::submit_io just push the descriptor and
the request into it and forget about everything. As a side-effect
this lets marking submit_io as noexcept.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/internal/io_desc.hh | 1 +
include/seastar/core/reactor.hh | 4 ++--
src/core/io_queue.cc | 30 ++++++++++++------------
src/core/reactor.cc | 19 +++++++++------
4 files changed, 30 insertions(+), 24 deletions(-)

diff --git a/include/seastar/core/internal/io_desc.hh b/include/seastar/core/internal/io_desc.hh
index 74d1cf90..0dc2d4bc 100644
--- a/include/seastar/core/internal/io_desc.hh
+++ b/include/seastar/core/internal/io_desc.hh
@@ -31,5 +31,6 @@ class kernel_completion {
~kernel_completion() = default;
public:
virtual void complete_with(ssize_t res) = 0;
+ virtual void complete_with_exception(std::exception_ptr eptr) { throw eptr; }
};
}
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index c3757070..4ad116b6 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -515,11 +515,11 @@ class reactor {
future<> chmod(sstring name, file_permissions permissions) noexcept;

future<int> inotify_add_watch(int fd, const sstring& path, uint32_t flags);
-
+
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(kernel_completion* desc, internal::io_request req);
+ void submit_io(kernel_completion* desc, internal::io_request req) noexcept;
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 028a2cbc..a020bf18 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -47,11 +47,6 @@ class io_desc_read_write final : public kernel_completion {
void notify_requests_finished() {
_ioq_ptr->notify_requests_finished(_fq_ticket);
}
-public:
- io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
- : _ioq_ptr(ioq)
- , _fq_ticket(ticket)
- {}

void set_exception(std::exception_ptr eptr) {
notify_requests_finished();
@@ -59,6 +54,12 @@ class io_desc_read_write final : public kernel_completion {
delete this;
}

+public:
+ io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
+ : _ioq_ptr(ioq)
+ , _fq_ticket(ticket)
+ {}
+
virtual void complete_with(ssize_t ret) override {
try {
engine().handle_io_result(ret);
@@ -70,6 +71,10 @@ class io_desc_read_write final : public kernel_completion {
}
}

+ virtual void complete_with_exception(std::exception_ptr eptr) override {
+ set_exception(eptr);
+ }
+
future<size_t> get_future() {
return _pr.get_future();
}
@@ -289,18 +294,13 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
- io_desc_read_write* desc = d.release();
_queued_requests--;
_requests_executing++;
- try {
- pclass.nr_queued--;
- pclass.ops++;
- pclass.bytes += len;
- pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ pclass.nr_queued--;
+ pclass.ops++;
+ pclass.bytes += len;
+ pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
+ engine().submit_io(d.release(), std::move(req));
});
return fut;
});
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 65655c53..8d65bf3c 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1490,9 +1490,13 @@ sstring io_request::opname() const {
}

void
-reactor::submit_io(kernel_completion* desc, io_request req) {
+reactor::submit_io(kernel_completion* desc, io_request req) noexcept {
req.attach_kernel_completion(desc);
- _pending_io.push_back(std::move(req));
+ try {
+ _pending_io.push_back(std::move(req));
+ } catch (...) {
+ desc->complete_with_exception(std::current_exception());
+ }
}

bool
@@ -1894,10 +1898,15 @@ reactor::fdatasync(int fd) noexcept {
}
}

+ virtual void complete_with_exception(std::exception_ptr eptr) override {
+ set_exception(eptr);
+ }
+
future<> get_future() {
return _pr.get_future();
}

+ private:
void set_exception(std::exception_ptr eptr) {
_pr.set_exception(eptr);
delete this;
@@ -1907,11 +1916,7 @@ reactor::fdatasync(int fd) noexcept {
auto desc = new fsync_io_desc();
auto fut = desc->get_future();
auto req = io_request::make_fdatasync(fd);
- try {
- submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ submit_io(desc, std::move(req));
return fut;
}
return _thread_pool->submit<syscall_result<int>>([fd] {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:20 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
When counting the requests queued in fair-queue the counters on
io_queue and priority_class_data are incremented first, then the
fair_queue::queue is called. The counters are decremented in the
queue()'s callback and since the former may throw, the counters
will remain incremented.

This is not a problem for io_queue's one, it's not used by anyone,
but the priority_class_data's one is reported into stats and is
thus used.

Fix by incrementing the counters after queue() call.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/io_queue.cc | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index a020bf18..158794f8 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -284,12 +284,9 @@ future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
- _queued_requests++;
-
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
- pclass.nr_queued++;
fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
@@ -302,6 +299,8 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
engine().submit_io(d.release(), std::move(req));
});
+ pclass.nr_queued++;
+ _queued_requests++;
return fut;
});
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:21 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The counter is inc-only in current code, but since nobody uses it, it
comes unnoticed. Shared fair queue will need it (in draft it needs) so
keep it correct. For this kill the batch processing, as it saves nothing
but keeps the io_queue class bigger, and notify the fq about completed
request right at once.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 2 ++
include/seastar/core/io_queue.hh | 3 +--
src/core/fair_queue.cc | 6 ++++++
src/core/io_queue.cc | 7 +------
tests/perf/fair_queue_perf.cc | 4 ++--
5 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 5f8889e1..24eb2942 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -237,6 +237,8 @@ class fair_queue {
/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
void notify_requests_finished(fair_queue_ticket desc);
+ /// And the version for tests' batch processing
+ void notify_requests_finished(fair_queue_ticket desc, unsigned nr);

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 3023811f..7181dfcd 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -88,7 +88,6 @@ class io_queue {

private:
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
- fair_queue_ticket _completed_accumulator = { 0, 0 };

fair_queue_ticket request_fq_ticket(const internal::io_request& req, size_t len);

@@ -143,7 +142,7 @@ class io_queue {
void notify_requests_finished(fair_queue_ticket& desc);

// Inform the underlying queue about the fact that some of our requests finished
- void process_completions();
+ void process_completions() {}

// Dispatch requests that are pending in the I/O queue
void poll_io_queue() {
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 24fa1bb7..3b18dbbc 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -156,6 +156,12 @@ void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyabl

void fair_queue::notify_requests_finished(fair_queue_ticket desc) {
_resources_executing -= desc;
+ _requests_executing--;
+}
+
+void fair_queue::notify_requests_finished(fair_queue_ticket desc, unsigned nr) {
+ _resources_executing -= desc;
+ _requests_executing -= nr;
}

void fair_queue::dispatch_requests() {
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 158794f8..a43b02a3 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -83,12 +83,7 @@ class io_desc_read_write final : public kernel_completion {
void
io_queue::notify_requests_finished(fair_queue_ticket& desc) {
_requests_executing--;
- _completed_accumulator += desc;
-}
-
-void
-io_queue::process_completions() {
- _fq.notify_requests_finished(std::exchange(_completed_accumulator, {}));
+ _fq.notify_requests_finished(desc);
}

fair_queue::config io_queue::make_fair_queue_config(config iocfg) {
diff --git a/tests/perf/fair_queue_perf.cc b/tests/perf/fair_queue_perf.cc
index 27a78dfb..77934de4 100644
--- a/tests/perf/fair_queue_perf.cc
+++ b/tests/perf/fair_queue_perf.cc
@@ -116,7 +116,7 @@ PERF_TEST_F(perf_fair_queue, contended_shared)
shared_fq.dispatch_requests();
uint32_t pending_ack = shared_executed - shared_acked;
shared_acked = shared_executed;
- shared_fq.notify_requests_finished(seastar::fair_queue_ticket{pending_ack, pending_ack});
+ shared_fq.notify_requests_finished(seastar::fair_queue_ticket{pending_ack, pending_ack}, pending_ack);
return make_ready_future<>();
});
return when_all_succeed(std::move(invokers), std::move(collectors)).discard_result();
@@ -140,7 +140,7 @@ PERF_TEST_F(perf_fair_queue, contended_shared_amortized)
shared_fq.dispatch_requests();
uint32_t pending_ack = shared_executed - shared_acked;
shared_acked = shared_executed;
- shared_fq.notify_requests_finished(seastar::fair_queue_ticket{pending_ack, pending_ack});
+ shared_fq.notify_requests_finished(seastar::fair_queue_ticket{pending_ack, pending_ack}, pending_ack);
return make_ready_future<>();
});
return when_all_succeed(std::move(invokers), std::move(collectors)).discard_result();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 25, 2020, 12:57:23 PM6/25/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
After the previous patch this is nothing but an empty loop, so drop one.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/io_queue.hh | 3 ---
src/core/reactor.cc | 6 +-----
2 files changed, 1 insertion(+), 8 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 7181dfcd..c93f9bed 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -141,9 +141,6 @@ class io_queue {

void notify_requests_finished(fair_queue_ticket& desc);

- // Inform the underlying queue about the fact that some of our requests finished
- void process_completions() {}
-
// Dispatch requests that are pending in the I/O queue
void poll_io_queue() {
_fq.dispatch_requests();
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 8d65bf3c..b89a71d8 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1509,11 +1509,7 @@ reactor::flush_pending_aio() {

bool
reactor::reap_kernel_completions() {
- auto reaped = _backend->reap_kernel_completions();
- for (auto& ioq : my_io_queues) {
- ioq->process_completions();
- }
- return reaped;
+ return _backend->reap_kernel_completions();
}

const io_priority_class& default_priority_class() {
--
2.20.1

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 11:52:20 AM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
nit: please split re-indentation out of this patch
for review clarity.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 11:52:58 AM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
On Thu, 2020-06-25 at 19:56 +0300, Pavel Emelyanov wrote:
for follow up, specify priority_class methods as noexcept.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 11:58:14 AM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
On Thu, 2020-06-25 at 19:56 +0300, Pavel Emelyanov wrote:
why are the above statements out of the try block?
They aren't noexcept, are they?

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 12:03:00 PM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
On Thu, 2020-06-25 at 19:56 +0300, Pavel Emelyanov wrote:
> Now all callers of reactor::submit_io just push the descriptor and
> the request into it and forget about everything. As a side-effect
> this lets marking submit_io as noexcept.
>
> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
> ---
> include/seastar/core/internal/io_desc.hh | 1 +
> include/seastar/core/reactor.hh | 4 ++--
> src/core/io_queue.cc | 30 ++++++++++++------------
> src/core/reactor.cc | 19 +++++++++------
> 4 files changed, 30 insertions(+), 24 deletions(-)
>
> diff --git a/include/seastar/core/internal/io_desc.hh b/include/seastar/core/internal/io_desc.hh
> index 74d1cf90..0dc2d4bc 100644
> --- a/include/seastar/core/internal/io_desc.hh
> +++ b/include/seastar/core/internal/io_desc.hh
> @@ -31,5 +31,6 @@ class kernel_completion {
> ~kernel_completion() = default;
> public:
> virtual void complete_with(ssize_t res) = 0;
> + virtual void complete_with_exception(std::exception_ptr eptr) { throw eptr; }

Do we have to implement it at the base class?
I'd rather have this pure virtual and noexcept.

> };
> }
> diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
> index c3757070..4ad116b6 100644
> --- a/include/seastar/core/reactor.hh
> +++ b/include/seastar/core/reactor.hh
> @@ -515,11 +515,11 @@ class reactor {
> future<> chmod(sstring name, file_permissions permissions) noexcept;
>
> future<int> inotify_add_watch(int fd, const sstring& path, uint32_t flags);
> -
> +

nit: unrelated cosmetic fix
should be made noexcept too

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 12:03:57 PM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
ack

On Thu, 2020-06-25 at 19:56 +0300, Pavel Emelyanov wrote:

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 12:07:25 PM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
On Thu, 2020-06-25 at 19:57 +0300, Pavel Emelyanov wrote:
> The counter is inc-only in current code, but since nobody uses it, it
> comes unnoticed. Shared fair queue will need it (in draft it needs) so
> keep it correct. For this kill the batch processing, as it saves nothing
> but keeps the io_queue class bigger, and notify the fq about completed
> request right at once.
>
> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
> ---
> include/seastar/core/fair_queue.hh | 2 ++
> include/seastar/core/io_queue.hh | 3 +--
> src/core/fair_queue.cc | 6 ++++++
> src/core/io_queue.cc | 7 +------
> tests/perf/fair_queue_perf.cc | 4 ++--
> 5 files changed, 12 insertions(+), 10 deletions(-)
>
> diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
> index 5f8889e1..24eb2942 100644
> --- a/include/seastar/core/fair_queue.hh
> +++ b/include/seastar/core/fair_queue.hh
> @@ -237,6 +237,8 @@ class fair_queue {
> /// Notifies that ont request finished
> /// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
> void notify_requests_finished(fair_queue_ticket desc);
> + /// And the version for tests' batch processing
> + void notify_requests_finished(fair_queue_ticket desc, unsigned nr);

why not just add an optional parameter, like this?
notify_requests_finished(fair_queue_ticket desc, unsigned nr = 1);

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 28, 2020, 12:07:55 PM6/28/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
nice :)

On Thu, 2020-06-25 at 19:57 +0300, Pavel Emelyanov wrote:

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 29, 2020, 9:14:30 AM6/29/20
to Benny Halevy, seastar-dev@googlegroups.com


On 28.06.2020 18:52, Benny Halevy wrote:
> nit: please split re-indentation out of this patch
> for review clarity.

Actually this whole patch is 25 lines of indentation fix plus 1 line of moving
the "try {" below.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 29, 2020, 9:17:04 AM6/29/20
to Benny Halevy, seastar-dev@googlegroups.com
Ouch, the new fsync_io_desc() isn't (the rest is de-facto noexcept).

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 29, 2020, 9:20:10 AM6/29/20
to Benny Halevy, seastar-dev@googlegroups.com


On 28.06.2020 19:02, Benny Halevy wrote:
> On Thu, 2020-06-25 at 19:56 +0300, Pavel Emelyanov wrote:
>> Now all callers of reactor::submit_io just push the descriptor and
>> the request into it and forget about everything. As a side-effect
>> this lets marking submit_io as noexcept.
>>
>> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
>> ---
>> include/seastar/core/internal/io_desc.hh | 1 +
>> include/seastar/core/reactor.hh | 4 ++--
>> src/core/io_queue.cc | 30 ++++++++++++------------
>> src/core/reactor.cc | 19 +++++++++------
>> 4 files changed, 30 insertions(+), 24 deletions(-)
>>
>> diff --git a/include/seastar/core/internal/io_desc.hh b/include/seastar/core/internal/io_desc.hh
>> index 74d1cf90..0dc2d4bc 100644
>> --- a/include/seastar/core/internal/io_desc.hh
>> +++ b/include/seastar/core/internal/io_desc.hh
>> @@ -31,5 +31,6 @@ class kernel_completion {
>> ~kernel_completion() = default;
>> public:
>> virtual void complete_with(ssize_t res) = 0;
>> + virtual void complete_with_exception(std::exception_ptr eptr) { throw eptr; }
>
> Do we have to implement it at the base class?

There are few more classes that inherit from it that don't mess with exceptional
completing at all, the fd_kernel_completion and pollable_fd_state_completion, both
would have to have equal (and pointless) implementation of it.

-- Pavel

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 30, 2020, 5:07:52 AM6/30/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
ok, fair enough.

Avi Kivity

<avi@scylladb.com>
unread,
Jun 30, 2020, 5:16:49 AM6/30/20
to Pavel Emelyanov, Benny Halevy, seastar-dev@googlegroups.com

On 29/06/2020 16.14, Pavel Emelyanov wrote:
>
>
> On 28.06.2020 18:52, Benny Halevy wrote:
>> nit: please split re-indentation out of this patch
>> for review clarity.
>
> Actually this whole patch is 25 lines of indentation fix plus 1 line
> of moving
> the "try {" below.
>

This patch basically is a re-indentation patch. I agree it does not need
to be split.

Avi Kivity

<avi@scylladb.com>
unread,
Jun 30, 2020, 5:21:02 AM6/30/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
The original code looks safer.


> - submit_io(desc.release(), std::move(req));
> - return fut;
> + submit_io(desc, std::move(req));
> } catch (...) {
> - return make_exception_future<>(std::current_exception());
> + desc->set_exception(std::current_exception());


What if we failed to allocate desc?

Avi Kivity

<avi@scylladb.com>
unread,
Jun 30, 2020, 5:33:15 AM6/30/20
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 25/06/2020 19.56, Pavel Emelyanov wrote:
> Now all callers of reactor::submit_io just push the descriptor and
> the request into it and forget about everything. As a side-effect
> this lets marking submit_io as noexcept.
>
> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
> ---
> include/seastar/core/internal/io_desc.hh | 1 +
> include/seastar/core/reactor.hh | 4 ++--
> src/core/io_queue.cc | 30 ++++++++++++------------
> src/core/reactor.cc | 19 +++++++++------
> 4 files changed, 30 insertions(+), 24 deletions(-)
>
> diff --git a/include/seastar/core/internal/io_desc.hh b/include/seastar/core/internal/io_desc.hh
> index 74d1cf90..0dc2d4bc 100644
> --- a/include/seastar/core/internal/io_desc.hh
> +++ b/include/seastar/core/internal/io_desc.hh
> @@ -31,5 +31,6 @@ class kernel_completion {
> ~kernel_completion() = default;
> public:
> virtual void complete_with(ssize_t res) = 0;
> + virtual void complete_with_exception(std::exception_ptr eptr) { throw eptr; }


This is surprising, complete_with() passes the exception onward. The
completion site wants to get rid of the exception, not get it back.


To avoid duplication, we should make complete_with() accept a size_t and
adjust all callers to extract the exception. Otherwise both
complete_with and complete_with_exception() will have code paths for the
exceptional case. We can add a non-virtual
complete_with_system_error(int) that accepts an errno, constructs an
system_error, and calls complete_with_exception() (so
complete_with(-EBADF) becomes complete_with_system_error(EBADF)).
How does this work? If not overridden, conplete_with_exception() will
throw the exception right back.


> }
>
> bool
> @@ -1894,10 +1898,15 @@ reactor::fdatasync(int fd) noexcept {
> }
> }
>
> + virtual void complete_with_exception(std::exception_ptr eptr) override {
> + set_exception(eptr);


std::move(), not that it matters greatly

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 30, 2020, 5:40:39 AM6/30/20
to Avi Kivity, seastar-dev@googlegroups.com


On 30.06.2020 12:33, Avi Kivity wrote:
>
> On 25/06/2020 19.56, Pavel Emelyanov wrote:
>> Now all callers of reactor::submit_io just push the descriptor and
>> the request into it and forget about everything. As a side-effect
>> this lets marking submit_io as noexcept.
>>
>> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
>> ---
>>   include/seastar/core/internal/io_desc.hh |  1 +
>>   include/seastar/core/reactor.hh          |  4 ++--
>>   src/core/io_queue.cc                     | 30 ++++++++++++------------
>>   src/core/reactor.cc                      | 19 +++++++++------
>>   4 files changed, 30 insertions(+), 24 deletions(-)
>>
>> diff --git a/include/seastar/core/internal/io_desc.hh b/include/seastar/core/internal/io_desc.hh
>> index 74d1cf90..0dc2d4bc 100644
>> --- a/include/seastar/core/internal/io_desc.hh
>> +++ b/include/seastar/core/internal/io_desc.hh
>> @@ -31,5 +31,6 @@ class kernel_completion {
>>       ~kernel_completion() = default;
>>   public:
>>       virtual void complete_with(ssize_t res) = 0;
>> +    virtual void complete_with_exception(std::exception_ptr eptr) { throw eptr; }
>
>
> This is surprising, complete_with() passes the exception onward. The completion site wants to get rid of the exception, not get it back.
>
>
> To avoid duplication, we should make complete_with() accept a size_t and adjust all callers to extract the exception. Otherwise both complete_with and complete_with_exception() will have code paths for the exceptional case. We can add a non-virtual complete_with_system_error(int) that accepts an errno, constructs an system_error, and calls complete_with_exception() (so complete_with(-EBADF) becomes complete_with_system_error(EBADF)).

Agree. Will reshuffle it this way.
The thing is that only io_desc_read_write and fsync_io_desc get here and both have
complete_with_exception overridden.

-- Pavel

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jun 30, 2020, 5:41:28 AM6/30/20
to Avi Kivity, seastar-dev@googlegroups.com
Yes, I missed that, will rework.

Avi Kivity

<avi@scylladb.com>
unread,
Jun 30, 2020, 6:08:38 AM6/30/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
It's a bug waiting to happen.


Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:04 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
They are trivially such, including the constructor

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 6d9d30da..c7b12589 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -104,14 +104,14 @@ class priority_class {
bool _queued = false;

friend struct shared_ptr_no_esft<priority_class>;
- explicit priority_class(uint32_t shares) : _shares(std::max(shares, 1u)) {}
+ explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}

- void update_shares(uint32_t shares) {
+ void update_shares(uint32_t shares) noexcept {
_shares = (std::max(shares, 1u));
}
public:
/// \brief return the current amount of shares for this priority class
- uint32_t shares() const {
+ uint32_t shares() const noexcept {
return _shares;
}
};
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:04 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The set includes preparations and two fixes for accounting found while
making shared fair-queue prototype and preparing tests for what it can
improve.

Branch: https://github.com/xemul/seastar/tree/br-io-queue-fixlets-2-2
Tests: unit(debug), io_tester

update in v2:
- mark related stuff with noexcept
- use default fn argument instead of overloading
- rework submit_io completions handling

Pavel Emelyanov (13):
priority_class: Mark methods noexcept
queues: Mark notify_requests_finished noexcept
fair_queue: Remove one level of indirection when updating class shares
io_queue: Turn _priority_classes into unique_ptr-s
io_queue: Localize fq_ticket evaluation for request in io_queue method
io_queue: Fix discrepancy of queuer reqs vs their number
fair_queue: Fix _requests_executing accounting
io_queue: Remove process_completions
reactor: Introduce io_completion
reactor: Handle submission exception itself
io_completion: Handle complete_with() itself
reactor, io_queue: Do not try-catch submit_io
reactor, io_queue: Fix indentation after previous patch

include/seastar/core/fair_queue.hh | 18 +++---
include/seastar/core/io_queue.hh | 11 ++--
include/seastar/core/reactor.hh | 19 +++---
src/core/fair_queue.cc | 7 +--
src/core/io_queue.cc | 93 +++++++++++++-----------------
src/core/reactor.cc | 73 +++++++++++++----------
tests/perf/fair_queue_perf.cc | 4 +-
tests/unit/fair_queue_test.cc | 2 +-
8 files changed, 108 insertions(+), 119 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:06 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Both io_queue and fair_queue have this method trivially non-throwing.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 2 +-
include/seastar/core/io_queue.hh | 2 +-
src/core/fair_queue.cc | 2 +-
src/core/io_queue.cc | 4 ++--
4 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index c7b12589..449c70bc 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -236,7 +236,7 @@ class fair_queue {

/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
- void notify_requests_finished(fair_queue_ticket desc);
+ void notify_requests_finished(fair_queue_ticket desc) noexcept;

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index e4f8cc8f..1347df3b 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -139,7 +139,7 @@ class io_queue {
return _requests_executing;
}

- void notify_requests_finished(fair_queue_ticket& desc);
+ void notify_requests_finished(fair_queue_ticket& desc) noexcept;

// Inform the underlying queue about the fact that some of our requests finished
void process_completions();
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 45cf9425..3dc215b8 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -154,7 +154,7 @@ void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyabl
_requests_queued++;
}

-void fair_queue::notify_requests_finished(fair_queue_ticket desc) {
+void fair_queue::notify_requests_finished(fair_queue_ticket desc) noexcept {
_resources_executing -= desc;
}

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 27f77145..e94cffef 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -44,7 +44,7 @@ class io_desc_read_write final : public kernel_completion {
fair_queue_ticket _fq_ticket;
promise<size_t> _pr;
private:
- void notify_requests_finished() {
+ void notify_requests_finished() noexcept {
_ioq_ptr->notify_requests_finished(_fq_ticket);
}
public:
@@ -80,7 +80,7 @@ class io_desc_read_write final : public kernel_completion {
};

void
-io_queue::notify_requests_finished(fair_queue_ticket& desc) {
+io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {
_requests_executing--;
_completed_accumulator += desc;
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:07 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Just to avoid one more doing nothing helper method on fair_queue

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 12 ++++--------
src/core/fair_queue.cc | 4 ----
src/core/io_queue.cc | 2 +-
tests/unit/fair_queue_test.cc | 2 +-
4 files changed, 6 insertions(+), 14 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 449c70bc..1cdc2108 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -106,14 +106,15 @@ class priority_class {
friend struct shared_ptr_no_esft<priority_class>;
explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}

- void update_shares(uint32_t shares) noexcept {
- _shares = (std::max(shares, 1u));
- }
public:
/// \brief return the current amount of shares for this priority class
uint32_t shares() const noexcept {
return _shares;
}
+
+ void update_shares(uint32_t shares) noexcept {
+ _shares = (std::max(shares, 1u));
+ }
};
/// \endcond

@@ -240,11 +241,6 @@ class fair_queue {

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
-
- /// Updates the current shares of this priority class
- ///
- /// \param new_shares the new number of shares for this priority class
- static void update_shares(priority_class_ptr pc, uint32_t new_shares);
};
/// @}

diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 3dc215b8..2c09dc96 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -192,8 +192,4 @@ void fair_queue::dispatch_requests() {
}
}

-void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) {
- pc->update_shares(new_shares);
-}
-
}
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index e94cffef..a40804a0 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -309,7 +309,7 @@ future<>
io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] {
auto& pclass = find_or_create_class(pc, owner);

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:08 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
They used to be shared for real in the past, but nowadays these are shard-local
objects that are used via references, so shared ptr is excessive here.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/io_queue.hh | 3 +--
src/core/io_queue.cc | 4 ++--
2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 1347df3b..aff90350 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -24,7 +24,6 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/fair_queue.hh>
#include <seastar/core/metrics_registration.hh>
-#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/internal/io_request.hh>
#include <mutex>
@@ -75,7 +74,7 @@ class io_queue {
void register_stats(sstring name, sstring mountpoint, shard_id owner);
};

- std::vector<std::vector<lw_shared_ptr<priority_class_data>>> _priority_classes;
+ std::vector<std::vector<std::unique_ptr<priority_class_data>>> _priority_classes;
fair_queue _fq;

static constexpr unsigned _max_classes = 2048;
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index a40804a0..0187c7cf 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -256,9 +256,9 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
// This conveys all the information we need and allows one to easily group all classes from
// the same I/O queue (by filtering by shard)
auto pc_ptr = _fq.register_priority_class(shares);
- auto pc_data = make_lw_shared<priority_class_data>(name, mountpoint(), pc_ptr, owner);
+ auto pc_data = std::make_unique<priority_class_data>(name, mountpoint(), pc_ptr, owner);

- _priority_classes[owner][id] = pc_data;
+ _priority_classes[owner][id] = std::move(pc_data);
}
return *_priority_classes[owner][id];
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:10 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
This makes it obvious and relaxes the io_desc_read_write interface.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/io_queue.hh | 2 ++
src/core/io_queue.cc | 39 ++++++++++++++++----------------
2 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index aff90350..4e295011 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -90,6 +90,8 @@ class io_queue {
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
fair_queue_ticket _completed_accumulator = { 0, 0 };

+ fair_queue_ticket request_fq_ticket(const internal::io_request& req, size_t len) const;
+
// The fields below are going away, they are just here so we can implement deprecated
// functions that used to be provided by the fair_queue and are going away (from both
// the fair_queue and the io_queue). Double-accounting for now will allow for easier
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 0187c7cf..86bcf195 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -48,15 +48,11 @@ class io_desc_read_write final : public kernel_completion {
_ioq_ptr->notify_requests_finished(_fq_ticket);
}
public:
- io_desc_read_write(io_queue* ioq, unsigned weight, unsigned size)
+ io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
: _ioq_ptr(ioq)
- , _fq_ticket(fair_queue_ticket{weight, size})
+ , _fq_ticket(ticket)
{}

- fair_queue_ticket& fq_ticket() {
- return _fq_ticket;
- }
-
void set_exception(std::exception_ptr eptr) {
notify_requests_finished();
_pr.set_exception(eptr);
@@ -263,6 +259,22 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
return *_priority_classes[owner][id];
}

+fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) const {
+ unsigned weight;
+ size_t size;
+ if (req.is_write()) {
+ weight = _config.disk_req_write_to_read_multiplier;
+ size = _config.disk_bytes_write_to_read_multiplier * len;
+ } else if (req.is_read()) {
+ weight = io_queue::read_request_base_count;
+ size = io_queue::read_request_base_count * len;
+ } else {
+ throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
+ }
+
+ return fair_queue_ticket(weight, size);
+}
+
future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
@@ -273,19 +285,8 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
pclass.nr_queued++;
- unsigned weight;
- size_t size;
- if (req.is_write()) {
- weight = _config.disk_req_write_to_read_multiplier;
- size = _config.disk_bytes_write_to_read_multiplier * len;
- } else if (req.is_read()) {
- weight = io_queue::read_request_base_count;
- size = io_queue::read_request_base_count * len;
- } else {
- throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
- }
- auto desc = std::make_unique<io_desc_read_write>(this, weight, size);
- auto fq_ticket = desc->fq_ticket();
+ fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
+ auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
io_desc_read_write* desc = d.release();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:11 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
When counting the requests queued in fair-queue the counters on
io_queue and priority_class_data are incremented first, then the
fair_queue::queue is called. The counters are decremented in the
queue()'s callback and since the former may throw, the counters
will remain incremented.

This is not a problem for io_queue's one, it's not used by anyone,
but the priority_class_data's one is reported into stats and is
thus used.

Fix by incrementing the counters after queue() call.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/io_queue.cc | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 86bcf195..308f481c 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -279,12 +279,9 @@ future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
- _queued_requests++;
-
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
- pclass.nr_queued++;
fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
@@ -302,6 +299,8 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
desc->set_exception(std::current_exception());

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:12 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The counter is inc-only in current code, but since nobody uses it, it
comes unnoticed. Shared fair queue will need it (in draft it needs) so
keep it correct. For this kill the batch processing, as it saves nothing
but keeps the io_queue class bigger, and notify the fq about completed
request right at once.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 2 +-
include/seastar/core/io_queue.hh | 3 +--
src/core/fair_queue.cc | 3 ++-
src/core/io_queue.cc | 7 +------
tests/perf/fair_queue_perf.cc | 4 ++--
5 files changed, 7 insertions(+), 12 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index 1cdc2108..a5db816d 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -237,7 +237,7 @@ class fair_queue {

/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
- void notify_requests_finished(fair_queue_ticket desc) noexcept;
+ void notify_requests_finished(fair_queue_ticket desc, unsigned nr = 1) noexcept;

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 4e295011..59b4825f 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -88,7 +88,6 @@ class io_queue {

private:
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
- fair_queue_ticket _completed_accumulator = { 0, 0 };

fair_queue_ticket request_fq_ticket(const internal::io_request& req, size_t len) const;

@@ -143,7 +142,7 @@ class io_queue {
void notify_requests_finished(fair_queue_ticket& desc) noexcept;

// Inform the underlying queue about the fact that some of our requests finished
- void process_completions();
+ void process_completions() {}

// Dispatch requests that are pending in the I/O queue
void poll_io_queue() {
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index 2c09dc96..226a61e1 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -154,8 +154,9 @@ void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyabl
_requests_queued++;
}

-void fair_queue::notify_requests_finished(fair_queue_ticket desc) noexcept {
+void fair_queue::notify_requests_finished(fair_queue_ticket desc, unsigned nr) noexcept {
_resources_executing -= desc;
+ _requests_executing -= nr;
}

void fair_queue::dispatch_requests() {
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 308f481c..bc12e03d 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -78,12 +78,7 @@ class io_desc_read_write final : public kernel_completion {
void
io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:13 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov, Benny Halevy
After the previous patch this is nothing but an empty loop, so drop one.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
Marked-nice-by: Benny Halevy <bha...@scylladb.com>
---
include/seastar/core/io_queue.hh | 3 ---
src/core/reactor.cc | 6 +-----
2 files changed, 1 insertion(+), 8 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 59b4825f..0e3a44c0 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -141,9 +141,6 @@ class io_queue {

void notify_requests_finished(fair_queue_ticket& desc) noexcept;

- // Inform the underlying queue about the fact that some of our requests finished
- void process_completions() {}
-
// Dispatch requests that are pending in the I/O queue
void poll_io_queue() {
_fq.dispatch_requests();
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 832a58c8..5c3bdf13 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1505,11 +1505,7 @@ reactor::flush_pending_aio() {

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:14 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The goal is to make reactor::submit_io do all the request completion
processing (and mark it noexcept). For this it will need to work on
more sophisticated completion API than the kernel_completion provides,
so here's the placeholder for it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/reactor.hh | 5 ++++-
src/core/io_queue.cc | 2 +-
src/core/reactor.cc | 4 ++--
3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 9387a640..d2c686ef 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -176,6 +176,9 @@ class kernel_completion;
class io_queue;
class disk_config_params;

+class io_completion : public kernel_completion {
+};
+
class reactor {
using sched_clock = std::chrono::steady_clock;
private:
@@ -519,7 +522,7 @@ class reactor {
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(kernel_completion* desc, internal::io_request req);
+ void submit_io(io_completion* desc, internal::io_request req);
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index bc12e03d..7e583528 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -39,7 +39,7 @@ namespace seastar {
using namespace std::chrono_literals;
using namespace internal::linux_abi;

-class io_desc_read_write final : public kernel_completion {
+class io_desc_read_write final : public io_completion {
io_queue* _ioq_ptr;
fair_queue_ticket _fq_ticket;
promise<size_t> _pr;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 5c3bdf13..fb6f9033 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1490,7 +1490,7 @@ sstring io_request::opname() const {
}

void
-reactor::submit_io(kernel_completion* desc, io_request req) {
+reactor::submit_io(io_completion* desc, io_request req) {
req.attach_kernel_completion(desc);
_pending_io.push_back(std::move(req));
}
@@ -1878,7 +1878,7 @@ reactor::fdatasync(int fd) noexcept {
if (_have_aio_fsync) {
try {
// Does not go through the I/O queue, but has to be deleted
- struct fsync_io_desc final : public kernel_completion {
+ struct fsync_io_desc final : public io_completion {
promise<> _pr;
public:
virtual void complete_with(ssize_t res) {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:15 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The IO request may be completed with an exception during the
submittion itself, so handle it there. And mark the submit_io
noexcept.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/reactor.hh | 4 +++-
src/core/io_queue.cc | 2 +-
src/core/reactor.cc | 15 ++++++++++++---
3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index d2c686ef..6f1e433e 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -177,6 +177,8 @@ class io_queue;
class disk_config_params;

class io_completion : public kernel_completion {
+public:
+ virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
};

class reactor {
@@ -522,7 +524,7 @@ class reactor {
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(io_completion* desc, internal::io_request req);
+ void submit_io(io_completion* desc, internal::io_request req) noexcept;
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 7e583528..a13b8bec 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -53,7 +53,7 @@ class io_desc_read_write final : public io_completion {
, _fq_ticket(ticket)
{}

- void set_exception(std::exception_ptr eptr) {
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
notify_requests_finished();
_pr.set_exception(eptr);
delete this;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index fb6f9033..f65f33a4 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1490,9 +1490,13 @@ sstring io_request::opname() const {
}

void
-reactor::submit_io(io_completion* desc, io_request req) {
+reactor::submit_io(io_completion* desc, io_request req) noexcept {
req.attach_kernel_completion(desc);
- _pending_io.push_back(std::move(req));
+ try {
+ _pending_io.push_back(std::move(req));
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
}

bool
@@ -1885,9 +1889,14 @@ reactor::fdatasync(int fd) noexcept {
try {
engine().handle_io_result(res);
_pr.set_value();
+ delete this;
} catch (...) {
- _pr.set_exception(std::current_exception());
+ set_exception(std::current_exception());
}
+ }
+
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
+ _pr.set_exception(std::move(eptr));
delete this;
}

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:16 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The handling of aio completion can now be generalized between io_completion
implementations with the help of .complete(size_t) method.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/reactor.hh | 12 +++++-------
src/core/io_queue.cc | 13 ++++---------
src/core/reactor.cc | 25 +++++++++++++++++--------
3 files changed, 26 insertions(+), 24 deletions(-)

diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 6f1e433e..113c5be1 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -178,6 +178,9 @@ class disk_config_params;

class io_completion : public kernel_completion {
public:
+ virtual void complete_with(ssize_t res) final override;
+
+ virtual void complete(size_t res) noexcept = 0;
virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
};

@@ -240,6 +243,8 @@ class reactor {
uint64_t fstream_read_aheads_discarded = 0;
uint64_t fstream_read_ahead_discarded_bytes = 0;
};
+ friend void io_completion::complete_with(ssize_t);
+
private:
reactor_config _cfg;
file_desc _notify_eventfd;
@@ -534,13 +539,6 @@ class reactor {
size_t len,
internal::io_request req) noexcept;

- inline void handle_io_result(ssize_t res) {
- if (res < 0) {
- ++_io_stats.aio_errors;
- throw_kernel_error(res);
- }
- }
-
int run();
void exit(int ret);
future<> when_started() { return _start_promise.get_future(); }
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index a13b8bec..877d7b11 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -59,15 +59,10 @@ class io_desc_read_write final : public io_completion {
delete this;
}

- virtual void complete_with(ssize_t ret) override {
- try {
- engine().handle_io_result(ret);
- notify_requests_finished();
- _pr.set_value(ret);
- delete this;
- } catch (...) {
- set_exception(std::current_exception());
- }
+ virtual void complete(size_t res) noexcept override {
+ notify_requests_finished();
+ _pr.set_value(res);
+ delete this;
}

future<size_t> get_future() {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index f65f33a4..166e7916 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1489,6 +1489,20 @@ sstring io_request::opname() const {
std::abort();
}

+void io_completion::complete_with(ssize_t res) {
+ if (res >= 0) {
+ complete(res);
+ return;
+ }
+
+ ++engine()._io_stats.aio_errors;
+ try {
+ throw_kernel_error(res);
+ } catch (...) {
+ set_exception(std::current_exception());
+ }
+}
+
void
reactor::submit_io(io_completion* desc, io_request req) noexcept {
req.attach_kernel_completion(desc);
@@ -1885,14 +1899,9 @@ reactor::fdatasync(int fd) noexcept {
struct fsync_io_desc final : public io_completion {
promise<> _pr;
public:
- virtual void complete_with(ssize_t res) {
- try {
- engine().handle_io_result(res);
- _pr.set_value();
- delete this;
- } catch (...) {
- set_exception(std::current_exception());
- }
+ virtual void complete(size_t res) noexcept override {
+ _pr.set_value();
+ delete this;
}

virtual void set_exception(std::exception_ptr eptr) noexcept override {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:18 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The latter is already noexcept, so no need in extra catching around it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/io_queue.cc | 7 +------
src/core/reactor.cc | 11 ++++-------
2 files changed, 5 insertions(+), 13 deletions(-)

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 877d7b11..0db1db1d 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -276,18 +276,13 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
- io_desc_read_write* desc = d.release();
_queued_requests--;
_requests_executing++;
- try {
pclass.nr_queued--;
pclass.ops++;
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ engine().submit_io(d.release(), std::move(req));
});
pclass.nr_queued++;
_queued_requests++;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 166e7916..703bc319 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1894,7 +1894,6 @@ reactor::fdatasync(int fd) noexcept {
return make_ready_future<>();
}
if (_have_aio_fsync) {
- try {
// Does not go through the I/O queue, but has to be deleted
struct fsync_io_desc final : public io_completion {
promise<> _pr;
@@ -1914,15 +1913,13 @@ reactor::fdatasync(int fd) noexcept {
}
};

- auto desc = std::make_unique<fsync_io_desc>();
+ return futurize_invoke([this, fd] {
+ auto desc = new fsync_io_desc;
auto fut = desc->get_future();
-
auto req = io_request::make_fdatasync(fd);
- submit_io(desc.release(), std::move(req));
+ submit_io(desc, std::move(req));
return fut;
- } catch (...) {
- return make_exception_future<>(std::current_exception());
- }
+ });
}
return _thread_pool->submit<syscall_result<int>>([fd] {
return wrap_syscall<int>(::fdatasync(fd));
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Jul 1, 2020, 8:52:19 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Just shifting the code left, no functional changes.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/io_queue.cc | 10 +++++-----
src/core/reactor.cc | 32 ++++++++++++++++----------------
2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 0db1db1d..019a23f5 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -278,11 +278,11 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
_queued_requests--;
_requests_executing++;
- pclass.nr_queued--;
- pclass.ops++;
- pclass.bytes += len;
- pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(d.release(), std::move(req));
+ pclass.nr_queued--;
+ pclass.ops++;
+ pclass.bytes += len;
+ pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
+ engine().submit_io(d.release(), std::move(req));
});
pclass.nr_queued++;
_queued_requests++;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 703bc319..f46515fa 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1894,24 +1894,24 @@ reactor::fdatasync(int fd) noexcept {
return make_ready_future<>();
}
if (_have_aio_fsync) {
- // Does not go through the I/O queue, but has to be deleted
- struct fsync_io_desc final : public io_completion {
- promise<> _pr;
- public:
- virtual void complete(size_t res) noexcept override {
- _pr.set_value();
- delete this;
- }
+ // Does not go through the I/O queue, but has to be deleted
+ struct fsync_io_desc final : public io_completion {
+ promise<> _pr;
+ public:
+ virtual void complete(size_t res) noexcept override {
+ _pr.set_value();
+ delete this;
+ }

- virtual void set_exception(std::exception_ptr eptr) noexcept override {
- _pr.set_exception(std::move(eptr));
- delete this;
- }
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
+ _pr.set_exception(std::move(eptr));
+ delete this;
+ }

- future<> get_future() {
- return _pr.get_future();
- }
- };
+ future<> get_future() {
+ return _pr.get_future();
+ }
+ };

return futurize_invoke([this, fd] {
auto desc = new fsync_io_desc;
--
2.20.1

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:01:54 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

queues: Mark notify_requests_finished noexcept

Both io_queue and fair_queue have this method trivially non-throwing.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -236,7 +236,7 @@ public:

/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
- void notify_requests_finished(fair_queue_ticket desc);
+ void notify_requests_finished(fair_queue_ticket desc) noexcept;

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -139,7 +139,7 @@ public:
return _requests_executing;
}

- void notify_requests_finished(fair_queue_ticket& desc);
+ void notify_requests_finished(fair_queue_ticket& desc) noexcept;

// Inform the underlying queue about the fact that some of our requests finished
void process_completions();
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -154,7 +154,7 @@ void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyabl
_requests_queued++;
}

-void fair_queue::notify_requests_finished(fair_queue_ticket desc) {
+void fair_queue::notify_requests_finished(fair_queue_ticket desc) noexcept {
_resources_executing -= desc;
}

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -44,7 +44,7 @@ class io_desc_read_write final : public kernel_completion {
fair_queue_ticket _fq_ticket;
promise<size_t> _pr;

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:01:55 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

priority_class: Mark methods noexcept

They are trivially such, including the constructor

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -104,14 +104,14 @@ class priority_class {
bool _queued = false;

friend struct shared_ptr_no_esft<priority_class>;
- explicit priority_class(uint32_t shares) : _shares(std::max(shares, 1u)) {}
+ explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}

- void update_shares(uint32_t shares) {
+ void update_shares(uint32_t shares) noexcept {
_shares = (std::max(shares, 1u));
}
public:
/// \brief return the current amount of shares for this priority class
- uint32_t shares() const {
+ uint32_t shares() const noexcept {
return _shares;
}
};

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:01:55 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

fair_queue: Remove one level of indirection when updating class shares

Just to avoid one more doing nothing helper method on fair_queue

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -106,14 +106,15 @@ class priority_class {
friend struct shared_ptr_no_esft<priority_class>;
explicit priority_class(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}

- void update_shares(uint32_t shares) noexcept {
- _shares = (std::max(shares, 1u));
- }
public:
/// \brief return the current amount of shares for this priority class
uint32_t shares() const noexcept {
return _shares;
}
+
+ void update_shares(uint32_t shares) noexcept {
+ _shares = (std::max(shares, 1u));
+ }
};
/// \endcond

@@ -240,11 +241,6 @@ public:

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
-
- /// Updates the current shares of this priority class
- ///
- /// \param new_shares the new number of shares for this priority class
- static void update_shares(priority_class_ptr pc, uint32_t new_shares);
};
/// @}

diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -192,8 +192,4 @@ void fair_queue::dispatch_requests() {
}
}

-void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) {
- pc->update_shares(new_shares);
-}
-
}
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -309,7 +309,7 @@ future<>
io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] {
auto& pclass = find_or_create_class(pc, owner);
- _fq.update_shares(pclass.ptr, new_shares);
+ pclass.ptr->update_shares(new_shares);
});
}

diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:01:56 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

io_queue: Turn _priority_classes into unique_ptr-s

They used to be shared for real in the past, but nowadays these are shard-local
objects that are used via references, so shared ptr is excessive here.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -24,7 +24,6 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/fair_queue.hh>
#include <seastar/core/metrics_registration.hh>
-#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/internal/io_request.hh>
#include <mutex>
@@ -75,7 +74,7 @@ private:
void register_stats(sstring name, sstring mountpoint, shard_id owner);
};

- std::vector<std::vector<lw_shared_ptr<priority_class_data>>> _priority_classes;
+ std::vector<std::vector<std::unique_ptr<priority_class_data>>> _priority_classes;
fair_queue _fq;

static constexpr unsigned _max_classes = 2048;
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:01:59 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

io_queue: Localize fq_ticket evaluation for request in io_queue method

This makes it obvious and relaxes the io_desc_read_write interface.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -90,6 +90,8 @@ private:
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
fair_queue_ticket _completed_accumulator = { 0, 0 };

+ fair_queue_ticket request_fq_ticket(const internal::io_request& req, size_t len) const;
+
// The fields below are going away, they are just here so we can implement deprecated
// functions that used to be provided by the fair_queue and are going away (from both
// the fair_queue and the io_queue). Double-accounting for now will allow for easier
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -48,15 +48,11 @@ class io_desc_read_write final : public kernel_completion {
_ioq_ptr->notify_requests_finished(_fq_ticket);
}
public:
- io_desc_read_write(io_queue* ioq, unsigned weight, unsigned size)
+ io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
: _ioq_ptr(ioq)
- , _fq_ticket(fair_queue_ticket{weight, size})
+ , _fq_ticket(ticket)
{}

- fair_queue_ticket& fq_ticket() {
- return _fq_ticket;
- }
-
void set_exception(std::exception_ptr eptr) {
notify_requests_finished();
_pr.set_exception(eptr);
@@ -263,6 +259,22 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
return *_priority_classes[owner][id];
}

+fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) const {
+ unsigned weight;
+ size_t size;
+ if (req.is_write()) {
+ weight = _config.disk_req_write_to_read_multiplier;
+ size = _config.disk_bytes_write_to_read_multiplier * len;
+ } else if (req.is_read()) {
+ weight = io_queue::read_request_base_count;
+ size = io_queue::read_request_base_count * len;
+ } else {
+ throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
+ }
+
+ return fair_queue_ticket(weight, size);
+}
+
future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
@@ -273,19 +285,8 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
pclass.nr_queued++;
- unsigned weight;
- size_t size;
- if (req.is_write()) {
- weight = _config.disk_req_write_to_read_multiplier;
- size = _config.disk_bytes_write_to_read_multiplier * len;
- } else if (req.is_read()) {
- weight = io_queue::read_request_base_count;
- size = io_queue::read_request_base_count * len;
- } else {
- throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
- }
- auto desc = std::make_unique<io_desc_read_write>(this, weight, size);
- auto fq_ticket = desc->fq_ticket();
+ fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
+ auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
io_desc_read_write* desc = d.release();

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:01:59 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

io_queue: Fix discrepancy of queuer reqs vs their number

When counting the requests queued in fair-queue the counters on
io_queue and priority_class_data are incremented first, then the
fair_queue::queue is called. The counters are decremented in the
queue()'s callback and since the former may throw, the counters
will remain incremented.

This is not a problem for io_queue's one, it's not used by anyone,
but the priority_class_data's one is reported into stats and is
thus used.

Fix by incrementing the counters after queue() call.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -279,12 +279,9 @@ future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
- _queued_requests++;
-
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
- pclass.nr_queued++;
fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:02 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

io_queue: Remove process_completions

After the previous patch this is nothing but an empty loop, so drop one.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
Marked-nice-by: Benny Halevy <bha...@scylladb.com>

---
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -141,9 +141,6 @@ public:

void notify_requests_finished(fair_queue_ticket& desc) noexcept;

- // Inform the underlying queue about the fact that some of our requests finished
- void process_completions() {}
-
// Dispatch requests that are pending in the I/O queue
void poll_io_queue() {
_fq.dispatch_requests();
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:03 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

fair_queue: Fix _requests_executing accounting

The counter is inc-only in current code, but since nobody uses it, it
comes unnoticed. Shared fair queue will need it (in draft it needs) so
keep it correct. For this kill the batch processing, as it saves nothing
but keeps the io_queue class bigger, and notify the fq about completed
request right at once.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -237,7 +237,7 @@ public:

/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
- void notify_requests_finished(fair_queue_ticket desc) noexcept;
+ void notify_requests_finished(fair_queue_ticket desc, unsigned nr = 1) noexcept;

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests();
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -88,7 +88,6 @@ public:

private:
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
- fair_queue_ticket _completed_accumulator = { 0, 0 };

fair_queue_ticket request_fq_ticket(const internal::io_request& req, size_t len) const;

@@ -143,7 +142,7 @@ public:
void notify_requests_finished(fair_queue_ticket& desc) noexcept;

// Inform the underlying queue about the fact that some of our requests finished
- void process_completions();
+ void process_completions() {}

// Dispatch requests that are pending in the I/O queue
void poll_io_queue() {
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -154,8 +154,9 @@ void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyabl
_requests_queued++;
}

-void fair_queue::notify_requests_finished(fair_queue_ticket desc) noexcept {
+void fair_queue::notify_requests_finished(fair_queue_ticket desc, unsigned nr) noexcept {
_resources_executing -= desc;
+ _requests_executing -= nr;
}

void fair_queue::dispatch_requests() {
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -78,12 +78,7 @@ class io_desc_read_write final : public kernel_completion {
void
io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {
_requests_executing--;
- _completed_accumulator += desc;
-}
-
-void
-io_queue::process_completions() {
- _fq.notify_requests_finished(std::exchange(_completed_accumulator, {}));
+ _fq.notify_requests_finished(desc);
}

fair_queue::config io_queue::make_fair_queue_config(config iocfg) {
diff --git a/tests/perf/fair_queue_perf.cc b/tests/perf/fair_queue_perf.cc

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:03 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

reactor: Introduce io_completion

The goal is to make reactor::submit_io do all the request completion
processing (and mark it noexcept). For this it will need to work on
more sophisticated completion API than the kernel_completion provides,
so here's the placeholder for it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -176,6 +176,9 @@ class kernel_completion;
class io_queue;
class disk_config_params;

+class io_completion : public kernel_completion {
+};
+
class reactor {
using sched_clock = std::chrono::steady_clock;
private:
@@ -519,7 +522,7 @@ public:
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(kernel_completion* desc, internal::io_request req);
+ void submit_io(io_completion* desc, internal::io_request req);
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -39,7 +39,7 @@ namespace seastar {
using namespace std::chrono_literals;
using namespace internal::linux_abi;

-class io_desc_read_write final : public kernel_completion {
+class io_desc_read_write final : public io_completion {
io_queue* _ioq_ptr;
fair_queue_ticket _fq_ticket;
promise<size_t> _pr;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1490,7 +1490,7 @@ sstring io_request::opname() const {
}

void
-reactor::submit_io(kernel_completion* desc, io_request req) {
+reactor::submit_io(io_completion* desc, io_request req) {
req.attach_kernel_completion(desc);
_pending_io.push_back(std::move(req));
}
@@ -1878,7 +1878,7 @@ reactor::fdatasync(int fd) noexcept {
if (_have_aio_fsync) {
try {
// Does not go through the I/O queue, but has to be deleted
- struct fsync_io_desc final : public kernel_completion {
+ struct fsync_io_desc final : public io_completion {

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:05 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

reactor: Handle submission exception itself

The IO request may be completed with an exception during the
submittion itself, so handle it there. And mark the submit_io
noexcept.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -177,6 +177,8 @@ class io_queue;
class disk_config_params;

class io_completion : public kernel_completion {
+public:
+ virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
};

class reactor {
@@ -522,7 +524,7 @@ public:
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(io_completion* desc, internal::io_request req);
+ void submit_io(io_completion* desc, internal::io_request req) noexcept;
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -53,7 +53,7 @@ class io_desc_read_write final : public io_completion {
, _fq_ticket(ticket)
{}

- void set_exception(std::exception_ptr eptr) {
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
notify_requests_finished();
_pr.set_exception(eptr);
delete this;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1490,9 +1490,13 @@ sstring io_request::opname() const {
}

void
-reactor::submit_io(io_completion* desc, io_request req) {
+reactor::submit_io(io_completion* desc, io_request req) noexcept {
req.attach_kernel_completion(desc);
- _pending_io.push_back(std::move(req));
+ try {
+ _pending_io.push_back(std::move(req));
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
}

bool
@@ -1885,9 +1889,14 @@ reactor::fdatasync(int fd) noexcept {
try {
engine().handle_io_result(res);
_pr.set_value();
+ delete this;
} catch (...) {
- _pr.set_exception(std::current_exception());
+ set_exception(std::current_exception());
}
+ }
+
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
+ _pr.set_exception(std::move(eptr));
delete this;
}

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:06 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

io_completion: Handle complete_with() itself

The handling of aio completion can now be generalized between io_completion
implementations with the help of .complete(size_t) method.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -178,6 +178,9 @@ class disk_config_params;

class io_completion : public kernel_completion {
public:
+ virtual void complete_with(ssize_t res) final override;
+
+ virtual void complete(size_t res) noexcept = 0;
virtual void set_exception(std::exception_ptr eptr) noexcept = 0;
};

@@ -240,6 +243,8 @@ public:
uint64_t fstream_read_aheads_discarded = 0;
uint64_t fstream_read_ahead_discarded_bytes = 0;
};
+ friend void io_completion::complete_with(ssize_t);
+
private:
reactor_config _cfg;
file_desc _notify_eventfd;
@@ -534,13 +539,6 @@ public:
size_t len,
internal::io_request req) noexcept;

- inline void handle_io_result(ssize_t res) {
- if (res < 0) {
- ++_io_stats.aio_errors;
- throw_kernel_error(res);
- }
- }
-
int run();
void exit(int ret);
future<> when_started() { return _start_promise.get_future(); }
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -59,15 +59,10 @@ class io_desc_read_write final : public io_completion {
delete this;
}

- virtual void complete_with(ssize_t ret) override {
- try {
- engine().handle_io_result(ret);
- notify_requests_finished();
- _pr.set_value(ret);
- delete this;
- } catch (...) {
- set_exception(std::current_exception());
- }
+ virtual void complete(size_t res) noexcept override {
+ notify_requests_finished();
+ _pr.set_value(res);
+ delete this;
}

future<size_t> get_future() {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1489,6 +1489,20 @@ sstring io_request::opname() const {
std::abort();
}

+void io_completion::complete_with(ssize_t res) {
+ if (res >= 0) {
+ complete(res);
+ return;
+ }
+
+ ++engine()._io_stats.aio_errors;
+ try {
+ throw_kernel_error(res);
+ } catch (...) {
+ set_exception(std::current_exception());
+ }
+}
+
void
reactor::submit_io(io_completion* desc, io_request req) noexcept {
req.attach_kernel_completion(desc);
@@ -1885,14 +1899,9 @@ reactor::fdatasync(int fd) noexcept {
struct fsync_io_desc final : public io_completion {
promise<> _pr;
public:
- virtual void complete_with(ssize_t res) {
- try {
- engine().handle_io_result(res);
- _pr.set_value();
- delete this;
- } catch (...) {
- set_exception(std::current_exception());
- }
+ virtual void complete(size_t res) noexcept override {
+ _pr.set_value();
+ delete this;
}

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:07 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

reactor, io_queue: Do not try-catch submit_io

The latter is already noexcept, so no need in extra catching around it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -276,18 +276,13 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
- io_desc_read_write* desc = d.release();
_queued_requests--;
_requests_executing++;
- try {
pclass.nr_queued--;
pclass.ops++;
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ engine().submit_io(d.release(), std::move(req));
});
pclass.nr_queued++;
_queued_requests++;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1894,7 +1894,6 @@ reactor::fdatasync(int fd) noexcept {
return make_ready_future<>();
}
if (_have_aio_fsync) {
- try {
// Does not go through the I/O queue, but has to be deleted
struct fsync_io_desc final : public io_completion {
promise<> _pr;
@@ -1914,15 +1913,13 @@ reactor::fdatasync(int fd) noexcept {
}
};

- auto desc = std::make_unique<fsync_io_desc>();
+ return futurize_invoke([this, fd] {
+ auto desc = new fsync_io_desc;
auto fut = desc->get_future();

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 1, 2020, 9:02:09 AM7/1/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

reactor, io_queue: Fix indentation after previous patch

Just shifting the code left, no functional changes.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -278,11 +278,11 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
_queued_requests--;
_requests_executing++;
- pclass.nr_queued--;
- pclass.ops++;
- pclass.bytes += len;
- pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(d.release(), std::move(req));
+ pclass.nr_queued--;
+ pclass.ops++;
+ pclass.bytes += len;
+ pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
+ engine().submit_io(d.release(), std::move(req));
});
pclass.nr_queued++;
_queued_requests++;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1894,24 +1894,24 @@ reactor::fdatasync(int fd) noexcept {
return make_ready_future<>();
}
if (_have_aio_fsync) {
- // Does not go through the I/O queue, but has to be deleted
- struct fsync_io_desc final : public io_completion {
- promise<> _pr;
- public:
- virtual void complete(size_t res) noexcept override {
- _pr.set_value();
- delete this;
- }
+ // Does not go through the I/O queue, but has to be deleted
+ struct fsync_io_desc final : public io_completion {
+ promise<> _pr;
+ public:
+ virtual void complete(size_t res) noexcept override {
+ _pr.set_value();
+ delete this;
+ }

- virtual void set_exception(std::exception_ptr eptr) noexcept override {
- _pr.set_exception(std::move(eptr));
- delete this;
- }
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
+ _pr.set_exception(std::move(eptr));
+ delete this;
+ }

- future<> get_future() {
- return _pr.get_future();
- }
- };
+ future<> get_future() {
+ return _pr.get_future();
+ }
+ };

return futurize_invoke([this, fd] {
Reply all
Reply to author
Forward
0 new messages