Right now the storage subsystem is placed inside the reactor. This
works because we only ever do I/O using the linux io_submit/io_getevents
interface.
Linux now has a new, improved interface to do I/O (io_uring) that
dispatches I/O by means of a different set of system calls with a
different set of data structures.
One possible approach would be to do something similar to the
reactor_backend model, as an io_backend.
However io_uring is the most delicious piece of awesomeness I've
seen in a long while and it does much more than just I/O: the interface
can do pretty much anything, which means we should be able to replace
the other backend operations with it too.
Therefore this patch lays the ground for that to happen by moving the
storage functionality inside the reactor backend.
include/seastar/core/internal/pollable_fd.hh | 1 +
include/seastar/core/reactor.hh | 18 +-
src/core/reactor_backend.hh | 57 +++++
src/core/reactor.cc | 156 +-------------
src/core/reactor_backend.cc | 215 ++++++++++++++++++-
5 files changed, 279 insertions(+), 168 deletions(-)
diff --git a/include/seastar/core/internal/pollable_fd.hh b/include/seastar/core/internal/pollable_fd.hh
index 20cbad2f..d8476151 100644
--- a/include/seastar/core/internal/pollable_fd.hh
+++ b/include/seastar/core/internal/pollable_fd.hh
@@ -151,6 +151,7 @@ class pollable_fd {
friend class reactor;
friend class readable_eventfd;
friend class writeable_eventfd;
+ friend class aio_storage_context;
private:
std::unique_ptr<pollable_fd_state> _s;
};
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 5e6d1a5e..0801a75b 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -182,6 +182,7 @@ class reactor {
friend class internal::reactor_stall_sampler;
friend class reactor_backend_epoll;
friend class reactor_backend_aio;
+ friend class aio_storage_context;
public:
class poller {
std::unique_ptr<pollfn> _pollfn;
@@ -242,20 +243,8 @@ class reactor {
static constexpr unsigned max_aio_per_queue = 128;
static constexpr unsigned max_queues = 8;
- static constexpr unsigned max_aio = max_aio_per_queue * max_queues;
friend disk_config_params;
-
- class iocb_pool {
- alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
- std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
- public:
- iocb_pool();
- internal::linux_abi::iocb* get_one();
- void put_one(internal::linux_abi::iocb* io);
- unsigned outstanding() const;
- };
-
// Not all reactors have IO queues. If the number of IO queues is less than the number of shards,
// some reactors will talk to foreign io_queues. If this reactor holds a valid IO queue, it will
// be stored here.
@@ -286,10 +275,6 @@ class reactor {
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
- internal::linux_abi::aio_context_t _io_context;
- iocb_pool _iocb_pool;
- boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio;
- boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
io_stats _io_stats;
uint64_t _fsyncs = 0;
uint64_t _cxx_exceptions = 0;
@@ -362,7 +347,6 @@ class reactor {
static std::chrono::nanoseconds calculate_poll_time();
static void block_notifier(int);
void wakeup();
- size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
bool flush_pending_aio();
bool flush_tcp_batches();
bool do_expire_lowres_timers();
diff --git a/src/core/reactor_backend.hh b/src/core/reactor_backend.hh
index 2cdc260d..533c329c 100644
--- a/src/core/reactor_backend.hh
+++ b/src/core/reactor_backend.hh
@@ -26,12 +26,15 @@
#include <seastar/core/internal/pollable_fd.hh>
#include <seastar/core/internal/poll.hh>
#include <seastar/core/linux-aio.hh>
+#include <seastar/core/cacheline.hh>
+#include <seastar/core/io_request_builder.hh>
#include <sys/time.h>
#include <signal.h>
#include <thread>
#include <stack>
#include <boost/any.hpp>
#include <boost/program_options.hpp>
+#include <boost/container/static_vector.hpp>
#ifdef HAVE_OSV
#include <osv/newpoll.hh>
@@ -40,6 +43,46 @@
namespace seastar {
class reactor;
+class io_desc;
+
+class aio_storage_context {
+public:
+ static constexpr unsigned max_aio = 1024;
+
+ class aio_storage_context_pollfn : public seastar::pollfn {
+ aio_storage_context* _ctx;
+ public:
+ virtual bool poll() override;
+ virtual bool pure_poll() override;
+ virtual bool try_enter_interrupt_mode() override;
+ virtual void exit_interrupt_mode() override;
+ explicit aio_storage_context_pollfn(aio_storage_context* ctx) : _ctx(ctx) {}
+ };
+
+ explicit aio_storage_context(reactor *r);
+ ~aio_storage_context();
+
+ void submit_io(io_desc* desc, io_request_builder builder);
+
+ std::unique_ptr<seastar::pollfn> create_poller();
+private:
+ reactor* _r;
+ internal::linux_abi::aio_context_t _io_context;
+
+ size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
+ bool flush_pending_aio();
+ bool process_io();
+
+ alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
+ std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
+
+ internal::linux_abi::iocb* get_one();
+ void put_one(internal::linux_abi::iocb* io);
+ unsigned outstanding() const;
+
+ boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio;
+ boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
+};
// The "reactor_backend" interface provides a method of waiting for various
// basic events on one thread. We have one implementation based on epoll and
@@ -68,6 +111,9 @@ class reactor_backend {
virtual void reset_preemption_monitor() = 0;
virtual void request_preemption() = 0;
virtual void start_handling_signal() = 0;
+
+ virtual std::unique_ptr<pollfn> create_storage_poller() = 0;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) = 0;
};
// reactor backend using file-descriptor & epoll, suitable for running on
@@ -84,6 +130,7 @@ class reactor_backend_epoll : public reactor_backend {
promise<> pollable_fd_state::* pr, int event);
void complete_epoll_event(pollable_fd_state& fd,
promise<> pollable_fd_state::* pr, int events, int event);
+ aio_storage_context _aio_storage_context;
public:
explicit reactor_backend_epoll(reactor* r);
virtual ~reactor_backend_epoll() override;
@@ -99,6 +146,9 @@ class reactor_backend_epoll : public reactor_backend {
virtual void reset_preemption_monitor() override;
virtual void request_preemption() override;
virtual void start_handling_signal() override;
+
+ virtual std::unique_ptr<pollfn> create_storage_poller() override;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) override;
};
class reactor_backend_aio : public reactor_backend {
@@ -118,6 +168,8 @@ class reactor_backend_aio : public reactor_backend {
};
context _preempting_io{2}; // Used for the timer tick and the high resolution timer
context _polling_io{max_polls}; // FIXME: unify with disk aio_context
+ aio_storage_context _aio_storage_context;
+
file_desc _steady_clock_timer = make_timerfd();
internal::linux_abi::iocb _task_quota_timer_iocb;
internal::linux_abi::iocb _timerfd_iocb;
@@ -161,6 +213,9 @@ class reactor_backend_aio : public reactor_backend {
virtual void reset_preemption_monitor() override;
virtual void request_preemption() override;
virtual void start_handling_signal() override;
+
+ virtual std::unique_ptr<pollfn> create_storage_poller() override;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) override;
};
#ifdef HAVE_OSV
@@ -181,6 +236,8 @@ class reactor_backend_osv : public reactor_backend {
virtual future<> writeable(pollable_fd_state& fd) override;
virtual void forget(pollable_fd_state& fd) override;
void enable_timer(steady_clock_type::time_point when);
+ virtual std::unique_ptr<pollfn> create_storage_poller() override;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) override;
};
#endif /* HAVE_OSV */
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index dc07ab2d..cdb71031 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -164,34 +164,6 @@ namespace seastar {
seastar::logger seastar_logger("seastar");
seastar::logger sched_logger("scheduler");
-reactor::iocb_pool::iocb_pool() {
- for (unsigned i = 0; i != max_aio; ++i) {
- _free_iocbs.push(&_iocb_pool[i]);
- }
-}
-
-inline
-internal::linux_abi::iocb*
-reactor::iocb_pool::get_one() {
- auto r = _free_iocbs.size();
- assert(r > 0);
- auto io = _free_iocbs.top();
- _free_iocbs.pop();
- return io;
-}
-
-inline
-void
-reactor::iocb_pool::put_one(internal::linux_abi::iocb* io) {
- _free_iocbs.push(io);
-}
-
-inline
-unsigned
-reactor::iocb_pool::outstanding() const {
- return max_aio - _free_iocbs.size();
-}
-
io_priority_class
reactor::register_one_priority_class(sstring name, uint32_t shares) {
return io_queue::register_one_priority_class(std::move(name), shares);
@@ -519,7 +491,7 @@ constexpr unsigned reactor::max_queues;
constexpr unsigned reactor::max_aio_per_queue;
// Broken (returns spurious EIO). Cause/fix unknown.
-static bool aio_nowait_supported = false;
+bool aio_nowait_supported = false;
static bool sched_debug() {
return false;
@@ -867,7 +839,6 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
#endif
, _cpu_started(0)
, _cpu_stall_detector(std::make_unique<cpu_stall_detector>(this))
- , _io_context(0)
, _reuseport(posix_reuseport_detect())
, _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
/*
@@ -883,7 +854,6 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
seastar::thread_impl::init();
_backend->start_tick();
- setup_aio_context(max_aio, &_io_context);
#ifdef HAVE_OSV
_timer_thread.start();
#else
@@ -917,7 +887,6 @@ reactor::~reactor() {
eraser(_expired_timers);
eraser(_expired_lowres_timers);
eraser(_expired_manual_timers);
- io_destroy(_io_context);
for (auto&& tq : _task_queues) {
if (tq) {
// The following line will preserve the convention that constructor and destructor functions
@@ -1474,45 +1443,7 @@ sstring io_request_builder::operation() const {
void
reactor::submit_io(io_desc* desc, io_request_builder req) {
- // We can ignore the future returned here, because the submitted aio will be polled
- // for and completed in process_io().
- auto* iocb = _iocb_pool.get_one();
- auto& io = *iocb;
- req.prepare_iocb(io);
- if (_aio_eventfd) {
- set_eventfd_notification(io, _aio_eventfd->get_fd());
- }
- if (aio_nowait_supported) {
- set_nowait(io, true);
- }
- set_user_data(io, desc);
- _pending_aio.push_back(&io);
-}
-
-// Returns: number of iocbs consumed (0 or 1)
-size_t
-reactor::handle_aio_error(linux_abi::iocb* iocb, int ec) {
- switch (ec) {
- case EAGAIN:
- return 0;
- case EBADF: {
- auto desc = reinterpret_cast<io_desc*>(get_user_data(*iocb));
- _iocb_pool.put_one(iocb);
- try {
- throw std::system_error(EBADF, std::system_category());
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
- delete desc;
- // if EBADF, it means that the first request has a bad fd, so
- // we will only remove it from _pending_aio and try again.
- return 1;
- }
- default:
- ++_io_stats.aio_errors;
- throw_system_error_on(true, "io_submit");
- abort();
- }
+ _backend->submit_io(desc, std::move(req));
}
bool
@@ -1523,42 +1454,6 @@ reactor::flush_pending_aio() {
did_work |= ioq->poll_io_queue();
}
- while (!_pending_aio.empty()) {
- auto nr = _pending_aio.size();
- auto iocbs = _pending_aio.data();
- auto r = io_submit(_io_context, nr, iocbs);
- size_t nr_consumed;
- if (r == -1) {
- nr_consumed = handle_aio_error(iocbs[0], errno);
- } else {
- nr_consumed = size_t(r);
- }
-
- did_work = true;
- if (nr_consumed == nr) {
- _pending_aio.clear();
- } else {
- _pending_aio.erase(_pending_aio.begin(), _pending_aio.begin() + nr_consumed);
- }
- }
- if (!_pending_aio_retry.empty()) {
- auto retries = std::exchange(_pending_aio_retry, {});
- // FIXME: future is discarded
- (void)_thread_pool->submit<syscall_result<int>>([this, retries] () mutable {
- auto r = io_submit(_io_context, retries.size(), retries.data());
- return wrap_syscall<int>(r);
- }).then([this, retries] (syscall_result<int> result) {
- auto iocbs = retries.data();
- size_t nr_consumed = 0;
- if (result.result == -1) {
- nr_consumed = handle_aio_error(iocbs[0], result.error);
- } else {
- nr_consumed = result.result;
- }
- std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry));
- });
- did_work = true;
- }
return did_work;
}
@@ -1583,37 +1478,6 @@ reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len,
return ioq->queue_request(pc, len, std::move(req));
}
-bool reactor::process_io()
-{
- io_event ev[max_aio];
- struct timespec timeout = {0, 0};
- auto n = io_getevents(_io_context, 1, max_aio, ev, &timeout, _force_io_getevents_syscall);
- if (n == -1 && errno == EINTR) {
- n = 0;
- }
- assert(n >= 0);
- unsigned nr_retry = 0;
- for (size_t i = 0; i < size_t(n); ++i) {
- auto iocb = get_iocb(ev[i]);
- if (ev[i].res == -EAGAIN) {
- ++nr_retry;
- set_nowait(*iocb, false);
- _pending_aio_retry.push_back(iocb);
- continue;
- }
- _iocb_pool.put_one(iocb);
- auto desc = reinterpret_cast<io_desc*>(ev[i].data);
- try {
- this->handle_io_result(ev[i]);
- desc->set_value(size_t(ev[i].res));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
- delete desc;
- }
- return n;
-}
-
namespace internal {
size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept {
@@ -2252,18 +2116,13 @@ class reactor::io_pollfn final : public reactor::pollfn {
public:
io_pollfn(reactor& r) : _r(r) {}
virtual bool poll() override final {
- return _r.process_io() | _r.flush_pending_aio();
+ return _r.flush_pending_aio();
}
virtual bool pure_poll() override final {
return poll(); // actually performs work, but triggers no user continuations, so okay
}
virtual bool try_enter_interrupt_mode() override {
- // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep
- // is only possible if there are no in-flight aios. If there are, we need to keep polling.
- //
- // Alternatively, if we enabled _aio_eventfd, we can always enter
- unsigned executing = _r._iocb_pool.outstanding();
- return executing == 0 || _r._aio_eventfd;
+ return true;
}
virtual void exit_interrupt_mode() override {
// nothing to do
@@ -2618,7 +2477,6 @@ int reactor::run() {
register_metrics();
- compat::optional<poller> io_poller = {};
compat::optional<poller> smp_poller = {};
// I/O Performance greatly increases if the smp poller runs before the I/O poller. This is
@@ -2627,9 +2485,9 @@ int reactor::run() {
if (smp::count > 1) {
smp_poller = poller(std::make_unique<smp_pollfn>(*this));
}
-#ifndef HAVE_OSV
- io_poller = poller(std::make_unique<io_pollfn>(*this));
-#endif
+
+ poller io_poller(std::make_unique<io_pollfn>(*this));
+ poller storage_poller(_backend->create_storage_poller());
poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));
poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());
diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc
index a798d34a..7eb7e24c 100644
--- a/src/core/reactor_backend.cc
+++ b/src/core/reactor_backend.cc
@@ -19,6 +19,9 @@
* Copyright 2019 ScyllaDB
*/
#include "core/reactor_backend.hh"
+#include "core/io_desc.hh"
+#include "core/thread_pool.hh"
+#include "core/syscall_result.hh"
#include <seastar/core/print.hh>
#include <seastar/core/reactor.hh>
#include <seastar/util/defer.hh>
@@ -36,6 +39,183 @@ using namespace std::chrono_literals;
using namespace internal;
using namespace internal::linux_abi;
+aio_storage_context::aio_storage_context(reactor *r)
+ : _r(r)
+ , _io_context(0)
+{
+ for (unsigned i = 0; i != max_aio; ++i) {
+ _free_iocbs.push(&_iocb_pool[i]);
+ }
+
+ setup_aio_context(max_aio, &_io_context);
+}
+
+aio_storage_context::~aio_storage_context() {
+ io_destroy(_io_context);
+}
+
+internal::linux_abi::iocb*
+aio_storage_context::get_one() {
+ auto r = _free_iocbs.size();
+ assert(r > 0);
+ auto io = _free_iocbs.top();
+ _free_iocbs.pop();
+ return io;
+}
+
+void
+aio_storage_context::put_one(internal::linux_abi::iocb* io) {
+ _free_iocbs.push(io);
+}
+
+unsigned
+aio_storage_context::outstanding() const {
+ return max_aio - _free_iocbs.size();
+}
+
+extern bool aio_nowait_supported;
+
+void
+aio_storage_context::submit_io(io_desc* desc, io_request_builder req) {
+ // We can ignore the future returned here, because the submitted aio will be polled
+ // for and completed in process_io().
+ auto* iocb = get_one();
+ auto& io = *iocb;
+ req.prepare_iocb(io);
+ if (_r->_aio_eventfd) {
+ set_eventfd_notification(io, _r->_aio_eventfd->get_fd());
+ }
+ if (aio_nowait_supported) {
+ set_nowait(io, true);
+ }
+ set_user_data(io, desc);
+ _pending_aio.push_back(&io);
+}
+
+bool aio_storage_context::process_io()
+{
+ io_event ev[max_aio];
+ struct timespec timeout = {0, 0};
+ auto n = io_getevents(_io_context, 1, max_aio, ev, &timeout, _r->_force_io_getevents_syscall);
+ if (n == -1 && errno == EINTR) {
+ n = 0;
+ }
+
+ assert(n >= 0);
+ unsigned nr_retry = 0;
+ for (size_t i = 0; i < size_t(n); ++i) {
+ auto iocb = get_iocb(ev[i]);
+ if (ev[i].res == -EAGAIN) {
+ ++nr_retry;
+ set_nowait(*iocb, false);
+ _pending_aio_retry.push_back(iocb);
+ continue;
+ }
+ put_one(iocb);
+ auto desc = reinterpret_cast<io_desc*>(ev[i].data);
+ try {
+ _r->handle_io_result(ev[i]);
+ desc->set_value(size_t(ev[i].res));
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
+ delete desc;
+ }
+ return n;
+}
+
+// Returns: number of iocbs consumed (0 or 1)
+size_t
+aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {
+ switch (ec) {
+ case EAGAIN:
+ return 0;
+ case EBADF: {
+ auto desc = reinterpret_cast<io_desc*>(get_user_data(*iocb));
+ put_one(iocb);
+ try {
+ throw std::system_error(EBADF, std::system_category());
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
+ delete desc;
+ // if EBADF, it means that the first request has a bad fd, so
+ // we will only remove it from _pending_aio and try again.
+ return 1;
+ }
+ default:
+ ++_r->_io_stats.aio_errors;
+ throw_system_error_on(true, "io_submit");
+ abort();
+ }
+}
+
+bool aio_storage_context::flush_pending_aio() {
+ bool did_work = false;
+ while (!_pending_aio.empty()) {
+ auto nr = _pending_aio.size();
+ auto iocbs = _pending_aio.data();
+ auto r = io_submit(_io_context, nr, iocbs);
+ size_t nr_consumed;
+ if (r == -1) {
+ nr_consumed = handle_aio_error(iocbs[0], errno);
+ } else {
+ nr_consumed = size_t(r);
+ }
+
+ did_work = true;
+ if (nr_consumed == nr) {
+ _pending_aio.clear();
+ } else {
+ _pending_aio.erase(_pending_aio.begin(), _pending_aio.begin() + nr_consumed);
+ }
+ }
+ if (!_pending_aio_retry.empty()) {
+ auto retries = std::exchange(_pending_aio_retry, {});
+ // FIXME: future is discarded
+ (void)engine()._thread_pool->submit<syscall_result<int>>([this, retries] () mutable {
+ auto r = io_submit(_io_context, retries.size(), retries.data());
+ return wrap_syscall<int>(r);
+ }).then([this, retries] (syscall_result<int> result) {
+ auto iocbs = retries.data();
+ size_t nr_consumed = 0;
+ if (result.result == -1) {
+ nr_consumed = handle_aio_error(iocbs[0], result.error);
+ } else {
+ nr_consumed = result.result;
+ }
+ std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry));
+ });
+ did_work = true;
+ }
+ did_work |= process_io();
+ return did_work;
+}
+
+bool aio_storage_context::aio_storage_context_pollfn::poll() {
+ return _ctx->flush_pending_aio();
+}
+
+bool aio_storage_context::aio_storage_context_pollfn::pure_poll() {
+ return poll(); // actually performs work, but triggers no user continuations, so okay
+}
+
+bool aio_storage_context::aio_storage_context_pollfn::try_enter_interrupt_mode() {
+ // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep
+ // is only possible if there are no in-flight aios. If there are, we need to keep polling.
+ //
+ // Alternatively, if we enabled _aio_eventfd, we can always enter
+ unsigned executing = _ctx->outstanding();
+ return executing == 0 || _ctx->_r->_aio_eventfd;
+}
+void aio_storage_context::aio_storage_context_pollfn::exit_interrupt_mode() {
+ // nothing to do
+}
+
+std::unique_ptr<pollfn> aio_storage_context::create_poller() {
+ return std::make_unique<aio_storage_context_pollfn>(this);
+}
+
reactor_backend_aio::context::context(size_t nr) : iocbs(new iocb*[nr]) {
setup_aio_context(nr, &io_context);
}
@@ -184,7 +364,10 @@ void reactor_backend_aio::signal_received(int signo, siginfo_t* siginfo, void* i
engine()._signals.action(signo, siginfo, ignore);
}
-reactor_backend_aio::reactor_backend_aio(reactor* r) : _r(r) {
+reactor_backend_aio::reactor_backend_aio(reactor* r)
+ : _r(r)
+ , _aio_storage_context(_r)
+{
_task_quota_timer_iocb = make_poll_iocb(_r->_task_quota_timer.get(), POLLIN);
_timerfd_iocb = make_poll_iocb(_steady_clock_timer.get(), POLLIN);
_smp_wakeup_iocb = make_poll_iocb(_r->_notify_eventfd.get(), POLLIN);
@@ -292,8 +475,19 @@ void reactor_backend_aio::start_handling_signal() {
// implementation of request_preemption is not signal safe, so do nothing.
}
+std::unique_ptr<pollfn> reactor_backend_aio::create_storage_poller() {
+ return _aio_storage_context.create_poller();
+}
+
+void reactor_backend_aio::submit_io(io_desc* desc, io_request_builder irb) {
+ return _aio_storage_context.submit_io(desc, std::move(irb));
+}
+
reactor_backend_epoll::reactor_backend_epoll(reactor* r)
- : _r(r), _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) {
+ : _r(r)
+ , _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC))
+ , _aio_storage_context(_r)
+{
::epoll_event event;
event.events = EPOLLIN;
event.data.ptr = nullptr;
@@ -455,6 +649,14 @@ void reactor_backend_epoll::reset_preemption_monitor() {
_r->_preemption_monitor.head.store(0, std::memory_order_relaxed);
}
+std::unique_ptr<pollfn> reactor_backend_epoll::create_storage_poller() {
+ return _aio_storage_context.create_poller();
+}
+
+void reactor_backend_epoll::submit_io(io_desc* desc, io_request_builder irb) {
+ return _aio_storage_context.submit_io(desc, std::move(irb));
+}
+
#ifdef HAVE_OSV
reactor_backend_osv::reactor_backend_osv() {
}
@@ -495,6 +697,15 @@ reactor_backend_osv::enable_timer(steady_clock_type::time_point when) {
_poller.set_timer(when);
}
+std::unique_ptr<pollfn> reactor_backend_osv::create_storage_poller() {
+ std::cerr << "reactor_backend_osv does not support file descriptors - create_storage_poller() shouldn't have been called!\n";
+ abort();
+}
+
+void reactor_backend_osv::submit_io(io_desc* desc, io_request_builder irb) {
+ std::cerr << "reactor_backend_osv does not support file descriptors - submit_io() shouldn't have been called!\n";
+ abort();
+}
#endif
static bool detect_aio_poll() {
--
2.20.1