Now all callers of reactor::submit_io just push the descriptor and
the request into it and forget about everything. As a side-effect
this lets marking submit_io as noexcept.
include/seastar/core/internal/io_desc.hh | 1 +
include/seastar/core/reactor.hh | 4 ++--
src/core/io_queue.cc | 30 ++++++++++++------------
src/core/reactor.cc | 19 +++++++++------
4 files changed, 30 insertions(+), 24 deletions(-)
diff --git a/include/seastar/core/internal/io_desc.hh b/include/seastar/core/internal/io_desc.hh
index 74d1cf90..0dc2d4bc 100644
--- a/include/seastar/core/internal/io_desc.hh
+++ b/include/seastar/core/internal/io_desc.hh
@@ -31,5 +31,6 @@ class kernel_completion {
~kernel_completion() = default;
public:
virtual void complete_with(ssize_t res) = 0;
+ virtual void complete_with_exception(std::exception_ptr eptr) { throw eptr; }
};
}
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index c3757070..4ad116b6 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -515,11 +515,11 @@ class reactor {
future<> chmod(sstring name, file_permissions permissions) noexcept;
future<int> inotify_add_watch(int fd, const sstring& path, uint32_t flags);
-
+
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(kernel_completion* desc, internal::io_request req);
+ void submit_io(kernel_completion* desc, internal::io_request req) noexcept;
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 028a2cbc..a020bf18 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -47,11 +47,6 @@ class io_desc_read_write final : public kernel_completion {
void notify_requests_finished() {
_ioq_ptr->notify_requests_finished(_fq_ticket);
}
-public:
- io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
- : _ioq_ptr(ioq)
- , _fq_ticket(ticket)
- {}
void set_exception(std::exception_ptr eptr) {
notify_requests_finished();
@@ -59,6 +54,12 @@ class io_desc_read_write final : public kernel_completion {
delete this;
}
+public:
+ io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
+ : _ioq_ptr(ioq)
+ , _fq_ticket(ticket)
+ {}
+
virtual void complete_with(ssize_t ret) override {
try {
engine().handle_io_result(ret);
@@ -70,6 +71,10 @@ class io_desc_read_write final : public kernel_completion {
}
}
+ virtual void complete_with_exception(std::exception_ptr eptr) override {
+ set_exception(eptr);
+ }
+
future<size_t> get_future() {
return _pr.get_future();
}
@@ -289,18 +294,13 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
- io_desc_read_write* desc = d.release();
_queued_requests--;
_requests_executing++;
- try {
- pclass.nr_queued--;
- pclass.ops++;
- pclass.bytes += len;
- pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ pclass.nr_queued--;
+ pclass.ops++;
+ pclass.bytes += len;
+ pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
+ engine().submit_io(d.release(), std::move(req));
});
return fut;
});
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 65655c53..8d65bf3c 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1490,9 +1490,13 @@ sstring io_request::opname() const {
}
void
-reactor::submit_io(kernel_completion* desc, io_request req) {
+reactor::submit_io(kernel_completion* desc, io_request req) noexcept {
req.attach_kernel_completion(desc);
- _pending_io.push_back(std::move(req));
+ try {
+ _pending_io.push_back(std::move(req));
+ } catch (...) {
+ desc->complete_with_exception(std::current_exception());
+ }
}
bool
@@ -1894,10 +1898,15 @@ reactor::fdatasync(int fd) noexcept {
}
}
+ virtual void complete_with_exception(std::exception_ptr eptr) override {
+ set_exception(eptr);
+ }
+
future<> get_future() {
return _pr.get_future();
}
+ private:
void set_exception(std::exception_ptr eptr) {
_pr.set_exception(eptr);
delete this;
@@ -1907,11 +1916,7 @@ reactor::fdatasync(int fd) noexcept {
auto desc = new fsync_io_desc();
auto fut = desc->get_future();
auto req = io_request::make_fdatasync(fd);
- try {
- submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ submit_io(desc, std::move(req));
return fut;
}
return _thread_pool->submit<syscall_result<int>>([fd] {
--
2.20.1