| [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Glauber Costa | 22/01/20 07:36 ص | This series provides the groundwork for moving our I/O subsystem towards
io_uring. I am not including any io_uring patches yet, and instead decided to collect early feedback on the general approach. The main seastar-specific problem we face is that we make assumptions about iocb, io_event and the presence of io_submit/io_getevents pretty much everywhere. The approach I have chosen to take is that all of that machinery is moved inside the already existing reactor_backend infrastructure. io_uring, when implemented, will be a new reactor backend capable of replacing all the polling and storage I/O at once. Comments welcome Glauber Costa (6): core: remove iocb references from I/O dispatchers core: remove io_event mentions from upper layer interfaces core: unify I/O pollers reactor: remove semaphore from iocb acquisition io_queue: return number of requests dispatched. core: move storage aio to reactor backend include/seastar/core/fair_queue.hh | 3 +- include/seastar/core/internal/pollable_fd.hh | 1 + include/seastar/core/io_queue.hh | 11 +- .../seastar/core/io_request_builder.hh | 40 ++- include/seastar/core/reactor.hh | 33 +-- src/core/io_desc.hh | 8 +- src/core/reactor_backend.hh | 57 +++++ src/core/fair_queue.cc | 5 +- src/core/file.cc | 32 +-- src/core/io_queue.cc | 23 +- src/core/reactor.cc | 240 +++++------------- src/core/reactor_backend.cc | 215 +++++++++++++++- 12 files changed, 400 insertions(+), 268 deletions(-) copy src/core/io_desc.hh => include/seastar/core/io_request_builder.hh (50%) -- 2.20.1 |
| [PATCH 2/6] core: remove io_event mentions from upper layer interfaces | Glauber Costa | 22/01/20 07:36 ص | We currently work with exposing an io_event, the return value of
the Linux async API at layers as high as read/write_dma. There are now other mechanisms to initiate Async I/O in Linux like io_uring that works with different structures. This patch changes the interfaces so we work with size_t types for as long as possible, and leave io_event as an implementation detail for the I/O subsystem. Signed-off-by: Glauber Costa <gla...@scylladb.com> --- include/seastar/core/io_queue.hh | 2 +- include/seastar/core/reactor.hh | 4 ++-- src/core/io_desc.hh | 8 ++++---- src/core/file.cc | 20 ++++---------------- src/core/io_queue.cc | 6 +++--- src/core/reactor.cc | 15 +++++++++------ 6 files changed, 23 insertions(+), 32 deletions(-) diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh index d3c002ae..e092ced8 100644 --- a/include/seastar/core/io_queue.hh +++ b/include/seastar/core/io_queue.hh @@ -110,7 +110,7 @@ class io_queue { io_queue(config cfg); ~io_queue(); - future<internal::linux_abi::io_event> + future<size_t> queue_request(const io_priority_class& pc, size_t len, io_request_builder req); size_t capacity() const { diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 70d45568..84ff9a98 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -530,11 +530,11 @@ class reactor { // in which it was generated. Therefore, care must be taken to avoid the use of objects that could // be destroyed within or at exit of prepare_io. void submit_io(io_desc* desc, io_request_builder builder); - future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq, + future<size_t> submit_io_read(io_queue* ioq, const io_priority_class& priority_class, size_t len, io_request_builder builder); - future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq, + future<size_t> submit_io_write(io_queue* ioq, const io_priority_class& priority_class, size_t len, io_request_builder builder); diff --git a/src/core/io_desc.hh b/src/core/io_desc.hh index 1a980e92..af3225b6 100644 --- a/src/core/io_desc.hh +++ b/src/core/io_desc.hh @@ -27,18 +27,18 @@ namespace seastar { class io_desc { - promise<seastar::internal::linux_abi::io_event> _pr; + promise<size_t> _pr; public: virtual ~io_desc() = default; virtual void set_exception(std::exception_ptr eptr) { _pr.set_exception(std::move(eptr)); } - virtual void set_value(seastar::internal::linux_abi::io_event& ev) { - _pr.set_value(ev); + virtual void set_value(size_t res) { + _pr.set_value(res); } - future<seastar::internal::linux_abi::io_event> get_future() { + future<size_t> get_future() { return _pr.get_future(); } }; diff --git a/src/core/file.cc b/src/core/file.cc index c2cdb35a..4de8ab16 100644 --- a/src/core/file.cc +++ b/src/core/file.cc @@ -322,10 +322,7 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex future<size_t> posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& io_priority_class) { io_request_builder req(io_request_builder::operation::write, _fd, pos, const_cast<void*>(buffer), len); - return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) { - engine().handle_io_result(ev); - return make_ready_future<size_t>(size_t(ev.res)); - }); + return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)); } future<size_t> @@ -334,19 +331,13 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priori auto size = iov.size(); auto data = iov.data(); io_request_builder req(io_request_builder::operation::writev, _fd, pos, data, size); - return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) { - engine().handle_io_result(ev); - return make_ready_future<size_t>(size_t(ev.res)); - }); + return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {}); } future<size_t> posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& io_priority_class) { io_request_builder req(io_request_builder::operation::read, _fd, pos, buffer, len); - return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) { - engine().handle_io_result(ev); - return make_ready_future<size_t>(size_t(ev.res)); - }); + return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)); } future<size_t> @@ -355,10 +346,7 @@ posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priorit auto size = iov.size(); auto data = iov.data(); io_request_builder req(io_request_builder::operation::readv, _fd, pos, data, size); - return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) { - engine().handle_io_result(ev); - return make_ready_future<size_t>(size_t(ev.res)); - }); + return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {}); } future<temporary_buffer<uint8_t>> diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index 3369d9f4..b8124a8c 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -61,9 +61,9 @@ class io_desc_read_write final : public io_desc { io_desc::set_exception(std::move(eptr)); } - void set_value(io_event& ev) { + void set_value(size_t ret) { notify_requests_finished(); - io_desc::set_value(ev); + io_desc::set_value(ret); } }; @@ -223,7 +223,7 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_ return *_priority_classes[owner][id]; } -future<io_event> +future<size_t> io_queue::queue_request(const io_priority_class& pc, size_t len, io_request_builder req) { auto start = std::chrono::steady_clock::now(); return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable { diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 226017f0..4d668c7a 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1571,14 +1571,14 @@ const io_priority_class& default_priority_class() { return shard_default_class; } -future<io_event> +future<size_t> reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) { ++_io_stats.aio_reads; _io_stats.aio_read_bytes += len; return ioq->queue_request(pc, len, std::move(req)); } -future<io_event> +future<size_t> reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) { ++_io_stats.aio_writes; _io_stats.aio_write_bytes += len; @@ -1605,7 +1605,12 @@ bool reactor::process_io() } _iocb_pool.put_one(iocb); auto desc = reinterpret_cast<io_desc*>(ev[i].data); - desc->set_value(ev[i]); + try { + this->handle_io_result(ev[i]); + desc->set_value(size_t(ev[i].res)); + } catch (...) { + desc->set_exception(std::current_exception()); + } delete desc; } return n; @@ -1913,9 +1918,7 @@ reactor::fdatasync(int fd) { io_request_builder req(io_request_builder::operation::fdatasync, fd); submit_io(desc.release(), std::move(req)); - return fut.then([] (linux_abi::io_event event) { - throw_kernel_error(event.res); - }); + return fut.discard_result(); } catch (...) { return make_exception_future<>(std::current_exception()); } -- 2.20.1 |
| [PATCH 1/6] core: remove iocb references from I/O dispatchers | Glauber Costa | 22/01/20 07:36 ص | Our async I/O mechanism relies on Linux async I/O interface for now,
which in turn relies on sending iocbs down to the Kernel. Because iocbs are so ingrained in the interface, they go all the way up and the write_dma and read_dma functions are already aware of it. However, there are now other interfaces, like io_uring, that rely on different data structures. To account for that, we will encapsulate the I/O information like file descriptor, addresses and sizes into a seastar structure and only transform this into iocbs down at the lowest level, where we already know which interface to use. In a follow up patch, we will have to do the same thing with the io_event structure. include/seastar/core/io_queue.hh | 5 +- include/seastar/core/io_request_builder.hh | 58 +++++++++++++++++++++ include/seastar/core/reactor.hh | 8 +-- src/core/file.cc | 20 +++----- src/core/io_queue.cc | 17 ++++--- src/core/reactor.cc | 59 ++++++++++++++++++---- 6 files changed, 132 insertions(+), 35 deletions(-) create mode 100644 include/seastar/core/io_request_builder.hh diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh index e1998073..d3c002ae 100644 --- a/include/seastar/core/io_queue.hh +++ b/include/seastar/core/io_queue.hh @@ -26,6 +26,7 @@ #include <seastar/core/metrics_registration.hh> #include <seastar/core/shared_ptr.hh> #include <seastar/core/future.hh> +#include <seastar/core/io_request_builder.hh> #include <mutex> #include <array> @@ -85,8 +86,6 @@ class io_queue { priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner); friend class smp; public: - enum class request_type { read, write }; - // We want to represent the fact that write requests are (maybe) more expensive // than read requests. To avoid dealing with floating point math we will scale one // read request to be counted by this amount. @@ -112,7 +111,7 @@ class io_queue { ~io_queue(); future<internal::linux_abi::io_event> - queue_request(const io_priority_class& pc, size_t len, request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> do_io); + queue_request(const io_priority_class& pc, size_t len, io_request_builder req); size_t capacity() const { return _config.capacity; diff --git a/include/seastar/core/io_request_builder.hh b/include/seastar/core/io_request_builder.hh new file mode 100644 index 00000000..d82e3b7e --- /dev/null +++ b/include/seastar/core/io_request_builder.hh @@ -0,0 +1,58 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2020 ScyllaDB + */ + +#pragma once + +#include <seastar/core/sstring.hh> +#include <seastar/core/linux-aio.hh> + +namespace seastar { +class io_request_builder { +public: + enum class operation { read, readv, write, writev, fdatasync }; + operation _op; + int _fd; + uint64_t _pos; + void *_address; + size_t _size; +public: + io_request_builder(operation op, int fd, uint64_t pos, void* address, size_t size) + : _op(op) + , _fd(fd) + , _pos(pos) + , _address(address) + , _size(size) + {} + io_request_builder(operation op, int fd) : io_request_builder(op, fd, 0, nullptr, 0) {} + + bool is_read() const { + return ((_op == operation::read) || (_op == operation::readv)); + } + + bool is_write() const { + return ((_op == operation::write) || (_op == operation::writev)); + } + + sstring operation() const; + + void prepare_iocb(internal::linux_abi::iocb& iocb); +}; +} diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 13070d7d..70d45568 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -80,6 +80,7 @@ #include <seastar/core/metrics_registration.hh> #include <seastar/core/scheduling.hh> #include <seastar/core/smp.hh> +#include <seastar/core/io_request_builder.hh> #include "internal/pollable_fd.hh" #include "internal/poll.hh" @@ -528,16 +529,15 @@ class reactor { // In the following three methods, prepare_io is not guaranteed to execute in the same processor // in which it was generated. Therefore, care must be taken to avoid the use of objects that could- void submit_io(io_desc* desc, - noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io); + void submit_io(io_desc* desc, io_request_builder builder); future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq, const io_priority_class& priority_class,- noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io); + io_request_builder builder); future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq, const io_priority_class& priority_class,- noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io); + io_request_builder builder); inline void handle_io_result(const internal::linux_abi::io_event& ev) { auto res = long(ev.res); diff --git a/src/core/file.cc b/src/core/file.cc index 5fa556b8..c2cdb35a 100644 --- a/src/core/file.cc +++ b/src/core/file.cc @@ -321,9 +321,8 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex - return engine().submit_io_write(_io_queue, io_priority_class, len, [fd = _fd, pos, buffer, len] (iocb& io) { - io = make_write_iocb(fd, pos, const_cast<void*>(buffer), len); - }).then([] (io_event ev) { + io_request_builder req(io_request_builder::operation::write, _fd, pos, const_cast<void*>(buffer), len); + return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) { engine().handle_io_result(ev); return make_ready_future<size_t>(size_t(ev.res)); }); @@ -334,9 +333,8 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priori auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment); auto size = iov.size();- return engine().submit_io_write(_io_queue, io_priority_class, len, [fd = _fd, pos, data, size] (iocb& io) { - io = make_writev_iocb(fd, pos, data, size); - }).then([iov = std::move(iov)] (io_event ev) { + io_request_builder req(io_request_builder::operation::writev, _fd, pos, data, size); + return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) { engine().handle_io_result(ev); return make_ready_future<size_t>(size_t(ev.res)); }); @@ -344,9 +342,8 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priori - return engine().submit_io_read(_io_queue, io_priority_class, len, [fd = _fd, pos, buffer, len] (iocb& io) { - io = make_read_iocb(fd, pos, buffer, len); - }).then([] (io_event ev) { + io_request_builder req(io_request_builder::operation::read, _fd, pos, buffer, len); + return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) { engine().handle_io_result(ev); return make_ready_future<size_t>(size_t(ev.res)); }); @@ -357,9 +354,8 @@ posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priorit auto len = internal::sanitize_iovecs(iov, _disk_read_dma_alignment); auto size = iov.size();- return engine().submit_io_read(_io_queue, io_priority_class, len, [fd = _fd, pos, data, size] (iocb& io) { - io = make_readv_iocb(fd, pos, data, size); - }).then([iov = std::move(iov)] (io_event ev) { + io_request_builder req(io_request_builder::operation::readv, _fd, pos, data, size); + return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) { engine().handle_io_result(ev); return make_ready_future<size_t>(size_t(ev.res)); }); diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index db9c5b0d..3369d9f4 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -31,6 +31,8 @@ #include <mutex> #include <array> #include "core/io_desc.hh" +#include <fmt/format.h> +#include <fmt/ostream.h> namespace seastar { @@ -222,32 +224,35 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_ } future<io_event> -io_queue::queue_request(const io_priority_class& pc, size_t len, io_queue::request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) { +io_queue::queue_request(const io_priority_class& pc, size_t len, io_request_builder req) { auto start = std::chrono::steady_clock::now();- return smp::submit_to(coordinator(), [start, &pc, len, req_type, prepare_io = std::move(prepare_io), owner = engine().cpu_id(), this] () mutable { + return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable { // First time will hit here, and then we create the class. It is important // that we create the shared pointer in the same shard it will be used at later. auto& pclass = find_or_create_class(pc, owner); pclass.nr_queued++; unsigned weight; size_t size; - if (req_type == io_queue::request_type::write) { + if (req.is_write()) { weight = _config.disk_req_write_to_read_multiplier; size = _config.disk_bytes_write_to_read_multiplier * len; - } else { + } else if (req.is_read()) { weight = io_queue::read_request_base_count; size = io_queue::read_request_base_count * len; + } else { + throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {} ", req.operation())); } + auto desc = std::make_unique<io_desc_read_write>(this, weight, size); auto fq_desc = desc->fq_descriptor(); auto fut = desc->get_future(); - _fq.queue(pclass.ptr, std::move(fq_desc), [&pclass, start, prepare_io = std::move(prepare_io), desc = std::move(desc), len, this] () mutable noexcept { + _fq.queue(pclass.ptr, std::move(fq_desc), [&pclass, start, req = std::move(req), desc = std::move(desc), len, this] () mutable noexcept { try { pclass.nr_queued--; pclass.ops++; pclass.bytes += len; pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start); - engine().submit_io(desc.get(), std::move(prepare_io)); + engine().submit_io(desc.get(), std::move(req)); desc.release(); } catch (...) { desc->set_exception(std::current_exception()); diff --git a/src/core/reactor.cc b/src/core/reactor.cc index ee16f109..226017f0 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1435,13 +1435,52 @@ reactor::connect(socket_address sa, socket_address local, transport proto) { return _network_stack->connect(sa, local, proto); } +void io_request_builder::prepare_iocb(iocb& iocb) { + switch (_op) { + case io_request_builder::operation::fdatasync: + iocb = make_fdsync_iocb(_fd); + break; + case io_request_builder::operation::write: + iocb = make_write_iocb(_fd, _pos, _address, _size); + break; + case io_request_builder::operation::writev: + iocb = make_writev_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size); + break; + case io_request_builder::operation::read: + iocb = make_read_iocb(_fd, _pos, _address, _size); + break; + case io_request_builder::operation::readv: + iocb = make_readv_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size); + break; + default: + throw std::runtime_error("Can't prepare IOCB for unrecognized operation"); + } +} + +sstring io_request_builder::operation() const { + switch (_op) { + case io_request_builder::operation::fdatasync: + return "fdatasync"; + case io_request_builder::operation::write: + return "write"; + case io_request_builder::operation::writev: + return "vectored write"; + case io_request_builder::operation::read: + return "read"; + case io_request_builder::operation::readv: + return "vectored read"; + default: + return fmt::format("unknown operation {}", unsigned(_op)); + } +} + void -reactor::submit_io(io_desc* desc, noncopyable_function<void (linux_abi::iocb&)> prepare_io) { +reactor::submit_io(io_desc* desc, io_request_builder req) { // We can ignore the future returned here, because the submitted aio will be polled // for and completed in process_io(). - (void)_iocb_pool.get_one().then([this, desc, prepare_io = std::move(prepare_io)] (linux_abi::iocb* iocb) mutable { + (void)_iocb_pool.get_one().then([this, desc, req = std::move(req)] (linux_abi::iocb* iocb) mutable { auto& io = *iocb; - prepare_io(io); + req.prepare_iocb(io); if (_aio_eventfd) { set_eventfd_notification(io, _aio_eventfd->get_fd()); } @@ -1533,17 +1572,17 @@ const io_priority_class& default_priority_class() { } future<io_event> -reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) { +reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) { ++_io_stats.aio_reads; _io_stats.aio_read_bytes += len; - return ioq->queue_request(pc, len, io_queue::request_type::read, std::move(prepare_io)); + return ioq->queue_request(pc, len, std::move(req)); } future<io_event> -reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) { +reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) { ++_io_stats.aio_writes; _io_stats.aio_write_bytes += len; - return ioq->queue_request(pc, len, io_queue::request_type::write, std::move(prepare_io)); + return ioq->queue_request(pc, len, std::move(req)); } bool reactor::process_io() @@ -1871,9 +1910,9 @@ reactor::fdatasync(int fd) { try { auto desc = std::make_unique<io_desc>(); auto fut = desc->get_future(); - submit_io(desc.release(), [fd] (linux_abi::iocb& iocb) { - iocb = make_fdsync_iocb(fd); - }); + + io_request_builder req(io_request_builder::operation::fdatasync, fd); + submit_io(desc.release(), std::move(req)); return fut.then([] (linux_abi::io_event event) {throw_kernel_error(event.res); }); -- 2.20.1 |
| [PATCH 3/6] core: unify I/O pollers | Glauber Costa | 22/01/20 07:36 ص | We currently have two different pollers for I/O: one for
submitting I/O, and another for fetching the results of a previous submission. There is no particular reason to have this separate at this stage, and the reasons for a two-poller system seems mostly historical to me (at some point, we could submit I/O outside of polling context) One of the pollers (getevents) is available - as far as ifdefs are concerned - on OSv, but I believe that even if this is not an overlook, if we haven't dispatched anything we'll hardly have anything to read so I am just ignoring that. include/seastar/core/reactor.hh | 2 -- src/core/reactor.cc | 22 +--------------------- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 84ff9a98..8f3f84de 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -160,7 +160,6 @@ class reactor { class io_pollfn; class signal_pollfn; - class aio_batch_submit_pollfn; class batch_flush_pollfn; class smp_pollfn; class drain_cross_cpu_freelist_pollfn; @@ -171,7 +170,6 @@ class reactor { class execution_stage_pollfn; friend io_pollfn; friend signal_pollfn; - friend aio_batch_submit_pollfn; friend batch_flush_pollfn; friend smp_pollfn; friend drain_cross_cpu_freelist_pollfn; diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 4d668c7a..8cdee5b7 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -2254,7 +2254,7 @@ class reactor::io_pollfn final : public reactor::pollfn { public: io_pollfn(reactor& r) : _r(r) {} virtual bool poll() override final { - return _r.process_io(); + return _r.process_io() | _r.flush_pending_aio(); } virtual bool pure_poll() override final { return poll(); // actually performs work, but triggers no user continuations, so okay @@ -2322,25 +2322,6 @@ class reactor::batch_flush_pollfn final : public reactor::pollfn { } }; -class reactor::aio_batch_submit_pollfn final : public reactor::pollfn { - reactor& _r; -public: - aio_batch_submit_pollfn(reactor& r) : _r(r) {} - virtual bool poll() final override { - return _r.flush_pending_aio(); - } - virtual bool pure_poll() override final { - return poll(); // actually performs work, but triggers no user continuations, so okay - } - virtual bool try_enter_interrupt_mode() override { - // This is a passive poller, so if a previous poll - // returned false (idle), there's no more work to do. - return true; - } - virtual void exit_interrupt_mode() override final { - } -}; - class reactor::drain_cross_cpu_freelist_pollfn final : public reactor::pollfn { public: virtual bool poll() final override { @@ -2651,7 +2632,6 @@ int reactor::run() { #ifndef HAVE_OSV io_poller = poller(std::make_unique<io_pollfn>(*this)); #endif - poller aio_poller(std::make_unique<aio_batch_submit_pollfn>(*this)); poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this)); poller execution_stage_poller(std::make_unique<execution_stage_pollfn>()); -- 2.20.1 |
| [PATCH 4/6] reactor: remove semaphore from iocb acquisition | Glauber Costa | 22/01/20 07:36 ص | We currently protect the iocb acquisition path in the storage layer with
a semaphore. This is a relic from a past in which we didn't have an I/O scheduler. In reality, access to this structure has been protected by the I/O scheduler for a long time, and we are guaranteed to never run out of iocbs in the poll. Removing the semaphore allow us to un-futurize this, which is a benefit in itself. However as it will become clear in the next patch, the biggest advantage of this is that we will be able to unify this code with the reactor backend. In preparation for that we will also add a test for the fact that we really indeed have enough entries to draw from, despite the I/O scheduler guarantees. include/seastar/core/reactor.hh | 3 +-- src/core/reactor.cc | 37 +++++++++++++++------------------ 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 8f3f84de..5e6d1a5e 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -248,11 +248,10 @@ class reactor { class iocb_pool { alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool; - semaphore _sem{0}; std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs; public: iocb_pool(); - future<internal::linux_abi::iocb*> get_one(); + internal::linux_abi::iocb* get_one(); void put_one(internal::linux_abi::iocb* io); unsigned outstanding() const; }; diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 8cdee5b7..5d810f9d 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -168,24 +168,22 @@ reactor::iocb_pool::iocb_pool() { for (unsigned i = 0; i != max_aio; ++i) { _free_iocbs.push(&_iocb_pool[i]); } - _sem.signal(max_aio); } inline -future<internal::linux_abi::iocb*> +internal::linux_abi::iocb* reactor::iocb_pool::get_one() { - return _sem.wait(1).then([this] { - auto io = _free_iocbs.top(); - _free_iocbs.pop(); - return io; - }); + auto r = _free_iocbs.size(); + assert(r > 0); + auto io = _free_iocbs.top(); + _free_iocbs.pop(); + return io; } inline void reactor::iocb_pool::put_one(internal::linux_abi::iocb* io) { _free_iocbs.push(io); - _sem.signal(1); } inline @@ -1478,18 +1476,17 @@ void reactor::submit_io(io_desc* desc, io_request_builder req) {- (void)_iocb_pool.get_one().then([this, desc, req = std::move(req)] (linux_abi::iocb* iocb) mutable { - auto& io = *iocb; - req.prepare_iocb(io); - if (_aio_eventfd) { - set_eventfd_notification(io, _aio_eventfd->get_fd()); - } - if (aio_nowait_supported) { - set_nowait(io, true); - } - set_user_data(io, desc); - _pending_aio.push_back(&io); - }); + auto* iocb = _iocb_pool.get_one(); + auto& io = *iocb; + req.prepare_iocb(io); + if (_aio_eventfd) { + set_eventfd_notification(io, _aio_eventfd->get_fd()); + } + if (aio_nowait_supported) { + set_nowait(io, true); + } + set_user_data(io, desc); + _pending_aio.push_back(&io); } // Returns: number of iocbs consumed (0 or 1) -- 2.20.1 |
| [PATCH 5/6] io_queue: return number of requests dispatched. | Glauber Costa | 22/01/20 07:36 ص | Right now we ignore the number of requests dispatched by the I/O queue.
Dispatching happens inside a seastar poller, but that poller does other things as well and that is enough to determine whether or not the poller did work. I am breaking this poller apart in two, with the I/O queue being polled separately from the OS-level work in preparation for io_uring support. When that happens, it will be necessary to know whether the poller did work. include/seastar/core/fair_queue.hh | 3 ++- include/seastar/core/io_queue.hh | 4 ++-- src/core/fair_queue.cc | 5 ++++- src/core/reactor.cc | 5 +++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index f05f713d..cd5794dd 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -183,7 +183,8 @@ class fair_queue { void notify_requests_finished(fair_queue_request_descriptor& desc); /// Try to execute new requests if there is capacity left in the queue. - void dispatch_requests(); + /// \return the number of requests dispatched. + size_t dispatch_requests(); /// Updates the current shares of this priority class /// diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh index e092ced8..813c0fda 100644 --- a/include/seastar/core/io_queue.hh +++ b/include/seastar/core/io_queue.hh @@ -132,8 +132,8 @@ class io_queue { } // Dispatch requests that are pending in the I/O queue - void poll_io_queue() { - _fq.dispatch_requests(); + size_t poll_io_queue() { + return _fq.dispatch_requests(); } sstring mountpoint() const { diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index e760304d..e46cd0b5 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -102,7 +102,8 @@ void fair_queue::notify_requests_finished(fair_queue_request_descriptor& desc) { } -void fair_queue::dispatch_requests() { +size_t fair_queue::dispatch_requests() { + size_t dispatched = 0; while (can_dispatch()) { priority_class_ptr h; do { @@ -115,6 +116,7 @@ void fair_queue::dispatch_requests() { _req_count_executing += req.desc.weight; _bytes_count_executing += req.desc.size; _requests_queued--; + dispatched++; auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); auto req_cost = (float(req.desc.weight) / _config.max_req_count + float(req.desc.size) / _config.max_bytes_count) / h->_shares; @@ -134,6 +136,7 @@ void fair_queue::dispatch_requests() { } req.func(); } + return dispatched; } void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) { diff --git a/src/core/reactor.cc b/src/core/reactor.cc index 5d810f9d..dc07ab2d 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1517,11 +1517,12 @@ reactor::handle_aio_error(linux_abi::iocb* iocb, int ec) { bool reactor::flush_pending_aio() { + bool did_work = false; + for (auto& ioq : my_io_queues) { - ioq->poll_io_queue(); + did_work |= ioq->poll_io_queue(); } - bool did_work = false; while (!_pending_aio.empty()) { auto nr = _pending_aio.size(); auto iocbs = _pending_aio.data(); -- 2.20.1 |
| [PATCH 6/6] core: move storage aio to reactor backend | Glauber Costa | 22/01/20 07:36 ص | Right now the storage subsystem is placed inside the reactor. This
works because we only ever do I/O using the linux io_submit/io_getevents interface. Linux now has a new, improved interface to do I/O (io_uring) that dispatches I/O by means of a different set of system calls with a different set of data structures. One possible approach would be to do something similar to the reactor_backend model, as an io_backend. However io_uring is the most delicious piece of awesomeness I've seen in a long while and it does much more than just I/O: the interface can do pretty much anything, which means we should be able to replace the other backend operations with it too. Therefore this patch lays the ground for that to happen by moving the storage functionality inside the reactor backend. include/seastar/core/internal/pollable_fd.hh | 1 + include/seastar/core/reactor.hh | 18 +- src/core/reactor_backend.hh | 57 +++++ src/core/reactor.cc | 156 +------------- src/core/reactor_backend.cc | 215 ++++++++++++++++++- 5 files changed, 279 insertions(+), 168 deletions(-) diff --git a/include/seastar/core/internal/pollable_fd.hh b/include/seastar/core/internal/pollable_fd.hh index 20cbad2f..d8476151 100644 --- a/include/seastar/core/internal/pollable_fd.hh +++ b/include/seastar/core/internal/pollable_fd.hh @@ -151,6 +151,7 @@ class pollable_fd { friend class reactor; friend class readable_eventfd; friend class writeable_eventfd; + friend class aio_storage_context; private: std::unique_ptr<pollable_fd_state> _s; }; diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 5e6d1a5e..0801a75b 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -182,6 +182,7 @@ class reactor { friend class internal::reactor_stall_sampler; friend class reactor_backend_epoll; friend class reactor_backend_aio; + friend class aio_storage_context; public: class poller { std::unique_ptr<pollfn> _pollfn; @@ -242,20 +243,8 @@ class reactor { static constexpr unsigned max_aio_per_queue = 128; static constexpr unsigned max_queues = 8; - static constexpr unsigned max_aio = max_aio_per_queue * max_queues; friend disk_config_params; - - class iocb_pool { - alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool; - std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs; - public: - iocb_pool(); - internal::linux_abi::iocb* get_one(); - void put_one(internal::linux_abi::iocb* io); - unsigned outstanding() const; - }; - // Not all reactors have IO queues. If the number of IO queues is less than the number of shards, // some reactors will talk to foreign io_queues. If this reactor holds a valid IO queue, it will // be stored here. @@ -286,10 +275,6 @@ class reactor { timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers; timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers; timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers; - internal::linux_abi::aio_context_t _io_context; - iocb_pool _iocb_pool; - boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio; - boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry; io_stats _io_stats; uint64_t _fsyncs = 0; uint64_t _cxx_exceptions = 0; @@ -362,7 +347,6 @@ class reactor { static std::chrono::nanoseconds calculate_poll_time(); static void block_notifier(int); void wakeup(); - size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec); bool flush_pending_aio(); bool flush_tcp_batches(); bool do_expire_lowres_timers(); diff --git a/src/core/reactor_backend.hh b/src/core/reactor_backend.hh index 2cdc260d..533c329c 100644 --- a/src/core/reactor_backend.hh +++ b/src/core/reactor_backend.hh @@ -26,12 +26,15 @@ #include <seastar/core/internal/pollable_fd.hh> #include <seastar/core/internal/poll.hh> #include <seastar/core/linux-aio.hh> +#include <seastar/core/cacheline.hh> +#include <seastar/core/io_request_builder.hh> #include <sys/time.h> #include <signal.h> #include <thread> #include <stack> #include <boost/any.hpp> #include <boost/program_options.hpp> +#include <boost/container/static_vector.hpp> #ifdef HAVE_OSV #include <osv/newpoll.hh> @@ -40,6 +43,46 @@ namespace seastar { class reactor; +class io_desc; + +class aio_storage_context { +public: + static constexpr unsigned max_aio = 1024; + + class aio_storage_context_pollfn : public seastar::pollfn { + aio_storage_context* _ctx; + public: + virtual bool poll() override; + virtual bool pure_poll() override; + virtual bool try_enter_interrupt_mode() override; + virtual void exit_interrupt_mode() override; + explicit aio_storage_context_pollfn(aio_storage_context* ctx) : _ctx(ctx) {} + }; + + explicit aio_storage_context(reactor *r); + ~aio_storage_context(); + + void submit_io(io_desc* desc, io_request_builder builder);+ + std::unique_ptr<seastar::pollfn> create_poller(); +private: + reactor* _r; + internal::linux_abi::aio_context_t _io_context; + + size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec); + bool flush_pending_aio(); + bool process_io(); + + alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool; + std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs; + + internal::linux_abi::iocb* get_one();+ void put_one(internal::linux_abi::iocb* io); + unsigned outstanding() const; + + boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio; + boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry; +}; // The "reactor_backend" interface provides a method of waiting for various // basic events on one thread. We have one implementation based on epoll and @@ -68,6 +111,9 @@ class reactor_backend { virtual void reset_preemption_monitor() = 0; virtual void request_preemption() = 0; virtual void start_handling_signal() = 0; + + virtual std::unique_ptr<pollfn> create_storage_poller() = 0; + virtual void submit_io(io_desc* desc, io_request_builder builder) = 0; }; // reactor backend using file-descriptor & epoll, suitable for running on @@ -84,6 +130,7 @@ class reactor_backend_epoll : public reactor_backend { promise<> pollable_fd_state::* pr, int event); void complete_epoll_event(pollable_fd_state& fd, promise<> pollable_fd_state::* pr, int events, int event); + aio_storage_context _aio_storage_context; public: explicit reactor_backend_epoll(reactor* r); virtual ~reactor_backend_epoll() override; @@ -99,6 +146,9 @@ class reactor_backend_epoll : public reactor_backend { virtual void reset_preemption_monitor() override; virtual void request_preemption() override; virtual void start_handling_signal() override; + + virtual std::unique_ptr<pollfn> create_storage_poller() override; + virtual void submit_io(io_desc* desc, io_request_builder builder) override; }; class reactor_backend_aio : public reactor_backend { @@ -118,6 +168,8 @@ class reactor_backend_aio : public reactor_backend { }; context _preempting_io{2}; // Used for the timer tick and the high resolution timer context _polling_io{max_polls}; // FIXME: unify with disk aio_context + aio_storage_context _aio_storage_context; + file_desc _steady_clock_timer = make_timerfd(); internal::linux_abi::iocb _task_quota_timer_iocb; internal::linux_abi::iocb _timerfd_iocb; @@ -161,6 +213,9 @@ class reactor_backend_aio : public reactor_backend { virtual void reset_preemption_monitor() override; virtual void request_preemption() override; virtual void start_handling_signal() override; + + virtual std::unique_ptr<pollfn> create_storage_poller() override; + virtual void submit_io(io_desc* desc, io_request_builder builder) override; }; #ifdef HAVE_OSV @@ -181,6 +236,8 @@ class reactor_backend_osv : public reactor_backend { virtual future<> writeable(pollable_fd_state& fd) override; virtual void forget(pollable_fd_state& fd) override; void enable_timer(steady_clock_type::time_point when); + virtual std::unique_ptr<pollfn> create_storage_poller() override; + virtual void submit_io(io_desc* desc, io_request_builder builder) override; }; #endif /* HAVE_OSV */ diff --git a/src/core/reactor.cc b/src/core/reactor.cc index dc07ab2d..cdb71031 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -164,34 +164,6 @@ namespace seastar { seastar::logger seastar_logger("seastar"); seastar::logger sched_logger("scheduler"); -reactor::iocb_pool::iocb_pool() { - for (unsigned i = 0; i != max_aio; ++i) { - _free_iocbs.push(&_iocb_pool[i]); - } -} - -inline -internal::linux_abi::iocb* -reactor::iocb_pool::get_one() { - auto r = _free_iocbs.size(); - assert(r > 0); - auto io = _free_iocbs.top();- -inline -void -reactor::iocb_pool::put_one(internal::linux_abi::iocb* io) { - _free_iocbs.push(io); -} - -inline -unsigned -reactor::iocb_pool::outstanding() const { - return max_aio - _free_iocbs.size(); -} - io_priority_class reactor::register_one_priority_class(sstring name, uint32_t shares) { return io_queue::register_one_priority_class(std::move(name), shares); @@ -519,7 +491,7 @@ constexpr unsigned reactor::max_queues; constexpr unsigned reactor::max_aio_per_queue; // Broken (returns spurious EIO). Cause/fix unknown. -static bool aio_nowait_supported = false; +bool aio_nowait_supported = false; static bool sched_debug() { return false; @@ -867,7 +839,6 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg) #endif , _cpu_started(0) , _cpu_stall_detector(std::make_unique<cpu_stall_detector>(this)) - , _io_context(0) , _reuseport(posix_reuseport_detect()) , _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) { /* @@ -883,7 +854,6 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg) seastar::thread_impl::init(); _backend->start_tick(); - setup_aio_context(max_aio, &_io_context); #ifdef HAVE_OSV _timer_thread.start(); #else @@ -917,7 +887,6 @@ reactor::~reactor() { eraser(_expired_timers); eraser(_expired_lowres_timers); eraser(_expired_manual_timers); - io_destroy(_io_context); for (auto&& tq : _task_queues) { if (tq) { // The following line will preserve the convention that constructor and destructor functions @@ -1474,45 +1443,7 @@ sstring io_request_builder::operation() const { - // We can ignore the future returned here, because the submitted aio will be polled - // for and completed in process_io(). - auto* iocb = _iocb_pool.get_one(); - auto& io = *iocb;- -// Returns: number of iocbs consumed (0 or 1) -size_t -reactor::handle_aio_error(linux_abi::iocb* iocb, int ec) { - switch (ec) { - case EAGAIN: - return 0; - case EBADF: { - auto desc = reinterpret_cast<io_desc*>(get_user_data(*iocb)); - _iocb_pool.put_one(iocb); - try { - throw std::system_error(EBADF, std::system_category()); - } catch (...) { - desc->set_exception(std::current_exception()); - } - delete desc; - // if EBADF, it means that the first request has a bad fd, so - // we will only remove it from _pending_aio and try again. - return 1; - } - default: - ++_io_stats.aio_errors; - throw_system_error_on(true, "io_submit"); - abort(); - } + _backend->submit_io(desc, std::move(req)); } bool @@ -1523,42 +1454,6 @@ reactor::flush_pending_aio() { did_work |= ioq->poll_io_queue(); } - while (!_pending_aio.empty()) { - auto nr = _pending_aio.size(); - auto iocbs = _pending_aio.data(); - auto r = io_submit(_io_context, nr, iocbs); - size_t nr_consumed; - if (r == -1) { - nr_consumed = handle_aio_error(iocbs[0], errno); - } else { - nr_consumed = size_t(r); - } - - did_work = true; - if (nr_consumed == nr) { - _pending_aio.clear(); - } else { - _pending_aio.erase(_pending_aio.begin(), _pending_aio.begin() + nr_consumed); - } - } - if (!_pending_aio_retry.empty()) { - auto retries = std::exchange(_pending_aio_retry, {}); - // FIXME: future is discarded - (void)_thread_pool->submit<syscall_result<int>>([this, retries] () mutable { - auto r = io_submit(_io_context, retries.size(), retries.data()); - return wrap_syscall<int>(r); - }).then([this, retries] (syscall_result<int> result) { - auto iocbs = retries.data(); - size_t nr_consumed = 0; - if (result.result == -1) { - nr_consumed = handle_aio_error(iocbs[0], result.error); - } else { - nr_consumed = result.result; - } - std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry)); - }); - did_work = true; - } return did_work; } @@ -1583,37 +1478,6 @@ reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, return ioq->queue_request(pc, len, std::move(req));-bool reactor::process_io() -{ - io_event ev[max_aio]; - struct timespec timeout = {0, 0}; - auto n = io_getevents(_io_context, 1, max_aio, ev, &timeout, _force_io_getevents_syscall); - if (n == -1 && errno == EINTR) { - n = 0; - } - assert(n >= 0); - unsigned nr_retry = 0; - for (size_t i = 0; i < size_t(n); ++i) { - auto iocb = get_iocb(ev[i]); - if (ev[i].res == -EAGAIN) { - ++nr_retry; - set_nowait(*iocb, false); - _pending_aio_retry.push_back(iocb); - continue; - } - _iocb_pool.put_one(iocb); - auto desc = reinterpret_cast<io_desc*>(ev[i].data); - try { - this->handle_io_result(ev[i]); - desc->set_value(size_t(ev[i].res)); - } catch (...) { - desc->set_exception(std::current_exception()); - } - delete desc; - } - return n; -} - namespace internal { size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept { @@ -2252,18 +2116,13 @@ class reactor::io_pollfn final : public reactor::pollfn { public:- return _r.process_io() | _r.flush_pending_aio(); + return _r.flush_pending_aio(); }virtual bool try_enter_interrupt_mode() override { - // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep - // is only possible if there are no in-flight aios. If there are, we need to keep polling. - // - // Alternatively, if we enabled _aio_eventfd, we can always enter - unsigned executing = _r._iocb_pool.outstanding(); - return executing == 0 || _r._aio_eventfd; + return true; } virtual void exit_interrupt_mode() override { // nothing to do @@ -2618,7 +2477,6 @@ int reactor::run() { register_metrics(); - compat::optional<poller> io_poller = {}; compat::optional<poller> smp_poller = {}; // I/O Performance greatly increases if the smp poller runs before the I/O poller. This is @@ -2627,9 +2485,9 @@ int reactor::run() { if (smp::count > 1) { smp_poller = poller(std::make_unique<smp_pollfn>(*this)); } -#ifndef HAVE_OSV - io_poller = poller(std::make_unique<io_pollfn>(*this)); -#endif + + poller io_poller(std::make_unique<io_pollfn>(*this)); + poller storage_poller(_backend->create_storage_poller()); diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc index a798d34a..7eb7e24c 100644 --- a/src/core/reactor_backend.cc +++ b/src/core/reactor_backend.cc @@ -19,6 +19,9 @@ * Copyright 2019 ScyllaDB */ #include "core/reactor_backend.hh" +#include "core/io_desc.hh" +#include "core/thread_pool.hh" +#include "core/syscall_result.hh" #include <seastar/core/print.hh> #include <seastar/core/reactor.hh> #include <seastar/util/defer.hh> @@ -36,6 +39,183 @@ using namespace std::chrono_literals; using namespace internal; using namespace internal::linux_abi; +aio_storage_context::aio_storage_context(reactor *r) + : _r(r) + , _io_context(0) +{ + for (unsigned i = 0; i != max_aio; ++i) { + _free_iocbs.push(&_iocb_pool[i]); + } + + setup_aio_context(max_aio, &_io_context); +} + +aio_storage_context::~aio_storage_context() { + io_destroy(_io_context); +} + +internal::linux_abi::iocb* +aio_storage_context::get_one() { + auto r = _free_iocbs.size();+} + +void +aio_storage_context::put_one(internal::linux_abi::iocb* io) { + _free_iocbs.push(io); +} + +unsigned +aio_storage_context::outstanding() const { + return max_aio - _free_iocbs.size(); +} + +extern bool aio_nowait_supported; + +void +aio_storage_context::submit_io(io_desc* desc, io_request_builder req) { + // We can ignore the future returned here, because the submitted aio will be polled + // for and completed in process_io(). + auto* iocb = get_one(); + auto& io = *iocb;+ if (_r->_aio_eventfd) { + set_eventfd_notification(io, _r->_aio_eventfd->get_fd()); + }+} + +bool aio_storage_context::process_io() +{ + io_event ev[max_aio]; + struct timespec timeout = {0, 0}; + auto n = io_getevents(_io_context, 1, max_aio, ev, &timeout, _r->_force_io_getevents_syscall); + if (n == -1 && errno == EINTR) { + n = 0; + } + + assert(n >= 0); + unsigned nr_retry = 0; + for (size_t i = 0; i < size_t(n); ++i) { + auto iocb = get_iocb(ev[i]); + if (ev[i].res == -EAGAIN) { + ++nr_retry; + set_nowait(*iocb, false); + _pending_aio_retry.push_back(iocb); + continue; + } + put_one(iocb); + auto desc = reinterpret_cast<io_desc*>(ev[i].data); + try { + _r->handle_io_result(ev[i]); + desc->set_value(size_t(ev[i].res));+ delete desc; + } + return n; +} + +// Returns: number of iocbs consumed (0 or 1) +size_t +aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) { + switch (ec) { + case EAGAIN: + return 0; + case EBADF: { + auto desc = reinterpret_cast<io_desc*>(get_user_data(*iocb)); + put_one(iocb); + try { + throw std::system_error(EBADF, std::system_category()); + } catch (...) {+ delete desc; + // if EBADF, it means that the first request has a bad fd, so + // we will only remove it from _pending_aio and try again. + return 1; + } + default: + ++_r->_io_stats.aio_errors; + throw_system_error_on(true, "io_submit"); + abort(); + } +} + +bool aio_storage_context::flush_pending_aio() { + bool did_work = false;+ while (!_pending_aio.empty()) { + auto nr = _pending_aio.size(); + auto iocbs = _pending_aio.data(); + auto r = io_submit(_io_context, nr, iocbs); + size_t nr_consumed; + if (r == -1) { + nr_consumed = handle_aio_error(iocbs[0], errno); + } else { + nr_consumed = size_t(r); + } + + did_work = true; + if (nr_consumed == nr) { + _pending_aio.clear(); + } else { + _pending_aio.erase(_pending_aio.begin(), _pending_aio.begin() + nr_consumed); + } + } + if (!_pending_aio_retry.empty()) { + auto retries = std::exchange(_pending_aio_retry, {}); + // FIXME: future is discarded + (void)engine()._thread_pool->submit<syscall_result<int>>([this, retries] () mutable { + auto r = io_submit(_io_context, retries.size(), retries.data()); + return wrap_syscall<int>(r); + }).then([this, retries] (syscall_result<int> result) { + auto iocbs = retries.data(); + size_t nr_consumed = 0; + if (result.result == -1) { + nr_consumed = handle_aio_error(iocbs[0], result.error); + } else { + nr_consumed = result.result; + } + std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry)); + }); + did_work = true; + } + did_work |= process_io(); + return did_work; +} + +bool aio_storage_context::aio_storage_context_pollfn::poll() { + return _ctx->flush_pending_aio(); +} + +bool aio_storage_context::aio_storage_context_pollfn::pure_poll() { + return poll(); // actually performs work, but triggers no user continuations, so okay +} + +bool aio_storage_context::aio_storage_context_pollfn::try_enter_interrupt_mode() { + // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep + // is only possible if there are no in-flight aios. If there are, we need to keep polling. + // + // Alternatively, if we enabled _aio_eventfd, we can always enter + unsigned executing = _ctx->outstanding(); + return executing == 0 || _ctx->_r->_aio_eventfd; +} +void aio_storage_context::aio_storage_context_pollfn::exit_interrupt_mode() { + // nothing to do +} + +std::unique_ptr<pollfn> aio_storage_context::create_poller() { + return std::make_unique<aio_storage_context_pollfn>(this); +} + reactor_backend_aio::context::context(size_t nr) : iocbs(new iocb*[nr]) { setup_aio_context(nr, &io_context); } @@ -184,7 +364,10 @@ void reactor_backend_aio::signal_received(int signo, siginfo_t* siginfo, void* i engine()._signals.action(signo, siginfo, ignore); } -reactor_backend_aio::reactor_backend_aio(reactor* r) : _r(r) { +reactor_backend_aio::reactor_backend_aio(reactor* r) + : _r(r) + , _aio_storage_context(_r) +{ _task_quota_timer_iocb = make_poll_iocb(_r->_task_quota_timer.get(), POLLIN); _timerfd_iocb = make_poll_iocb(_steady_clock_timer.get(), POLLIN); _smp_wakeup_iocb = make_poll_iocb(_r->_notify_eventfd.get(), POLLIN); @@ -292,8 +475,19 @@ void reactor_backend_aio::start_handling_signal() { // implementation of request_preemption is not signal safe, so do nothing. } +std::unique_ptr<pollfn> reactor_backend_aio::create_storage_poller() { + return _aio_storage_context.create_poller(); +} + +void reactor_backend_aio::submit_io(io_desc* desc, io_request_builder irb) { + return _aio_storage_context.submit_io(desc, std::move(irb)); +} + reactor_backend_epoll::reactor_backend_epoll(reactor* r) - : _r(r), _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) { + : _r(r) + , _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) + , _aio_storage_context(_r) +{ ::epoll_event event; event.events = EPOLLIN; event.data.ptr = nullptr; @@ -455,6 +649,14 @@ void reactor_backend_epoll::reset_preemption_monitor() { _r->_preemption_monitor.head.store(0, std::memory_order_relaxed); } +std::unique_ptr<pollfn> reactor_backend_epoll::create_storage_poller() { + return _aio_storage_context.create_poller(); +} + +void reactor_backend_epoll::submit_io(io_desc* desc, io_request_builder irb) { + return _aio_storage_context.submit_io(desc, std::move(irb)); +} + #ifdef HAVE_OSV reactor_backend_osv::reactor_backend_osv() { } @@ -495,6 +697,15 @@ reactor_backend_osv::enable_timer(steady_clock_type::time_point when) { _poller.set_timer(when); } +std::unique_ptr<pollfn> reactor_backend_osv::create_storage_poller() { + std::cerr << "reactor_backend_osv does not support file descriptors - create_storage_poller() shouldn't have been called!\n"; + abort(); +} + +void reactor_backend_osv::submit_io(io_desc* desc, io_request_builder irb) { + std::cerr << "reactor_backend_osv does not support file descriptors - submit_io() shouldn't have been called!\n"; + abort(); +} #endif static bool detect_aio_poll() { -- 2.20.1 |
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Avi Kivity | 22/01/20 07:53 ص |
> index 00000000..d82e3b7e> +namespace seastar { Please put this in the seastar::internal namespace so users aren't tempted to touch it (the file can be in include/seastar/core/internal). Our code has the case statements aligned with the switch statement. A fully virtualized solution will push the building to reactor_backend. Of course that is difficult because there is no common type to return, so reactor_backend will have to build and queue, not just build. With that in mind, this is better named io_request (and can be a plain struct). Of course, we can clean that up later. |
| Re: [seastar-dev] [PATCH 4/6] reactor: remove semaphore from iocb acquisition | Avi Kivity | 22/01/20 08:11 ص |
> We currently protect the iocb acquisition path in the storage layer withNo, it's very new: commit 8011bd33bf2c9a1f108eb722c8cc4f55e5592be9 Author: Avi Kivity <a...@scylladb.com> Date: Tue Dec 10 15:22:08 2019 +0200 reactor: fix iocb pool underflow due to unaccounted aio fsync The iocb pool is sized for the worst case of all I/O queues having 128 outstanding requests each. But fsync also consumes iocbs from the pool, without any reservation having been made for them. Usually this doesn't matter because those fsyncs complete quickly, and because the pool is hugely oversized for the common case. But in the uncommon case (ENOSPC) things can clog and we can underflow the pool. Fix by introducing a semaphore for guarding the pool. It should be very rare that we have to wait on the semaphore, since usually the io_queue mechanism will limit requests to well below the pool capacity. It should only matter when the disk stalls and gets filled with fsync requests. Ref scylladb/scylla#5443. I don't see where we guarantee we won't queue more than max_aio iocbs (I agree we should push the limit to the scheduler). There is a check looking at max_aio_per_queue, but that is related to mountpoints. Relevant line are: if (configuration.count("max-io-requests")) { _capacity = configuration["max-io-requests"].as<unsigned>(); } Which we pass from the input without a cap, and if max-io-requests is not provided, it's derived dynamically from the disk bandwidth and the task quota period. This never harmed us because the pool size is very large (1024 per shard, which no disk needs), except for the fsync trouble above. Since fsync is handled outside the scheduler, I'm not sure we can push the limit to the scheduler. Not that this code doesn't add tasks so it's cheap compared to continuations. Still, removing it is a win. You can undo the half-indent (that was added for the semaphore), reducing noise. |
| Re: [seastar-dev] [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Avi Kivity | 22/01/20 08:19 ص |
> This series provides the groundwork for moving our I/O subsystem towardsLooks good, but I'm afraid patch 4 will require a lot more work. Better to drop it now. |
| Re: [seastar-dev] [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Glauber Costa | 22/01/20 08:50 ص | okay. The main motivation for that patch in particular was to facilitate a potential unification between iocb management in the storage and poll layers. However I wasn't planning on doing it anyway, since once we move to io_uring I expect the benefits of that to be much lower. Dropping that. |
| Re: [seastar-dev] [PATCH 4/6] reactor: remove semaphore from iocb acquisition | Glauber Costa | 22/01/20 08:51 ص |
Ok, I think I know why I thought it was ancient: I now recall that I had removed this semaphore before (before the fdatasync thing existed). I am dropping this patch for now, thanks. |
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Glauber Costa | 22/01/20 08:55 ص |
thanks. I will do that. Okay, will adjust.
There is no reason not to do it now, since the main goal of me sending this series. It is possible to move it fully to the backend, but not easy to do in this patch. The reason is that this becomes a bit circular: moving to the backend is made easier by the fact that the iocb reference is gone. But once the code is moved to the reactor backend this function is only really used by the aio_context's version of submit_io. I will make sure this is moved in that patch (right now it isn't) |
| Re: [seastar-dev] [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Avi Kivity | 22/01/20 08:55 ص | I think it's a worthwhile goal even with io_uring. Note that that
the fd io_uring/io_context will also need limit checking, right
now we just hope we allocated enough.
I think io_uring decouples the communication ring capacity from
the concurrency limit while aio does not, so it should be easier
in io_uring. But of course we can do it later. |
| Re: [seastar-dev] [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Glauber Costa | 22/01/20 09:03 ص | My plan was to leave that as an exercise for the reader. If book authors can get away with this clear sham, so can I. |
| Re: [seastar-dev] [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Avi Kivity | 22/01/20 09:27 ص | Book authors have readers.
|
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Rafael Avila de Espindola | 22/01/20 01:15 م | Glauber Costa <gla...@scylladb.com> writes:io_request_builder has a _size, is that the same as len in here? Could len be removed? BTW, not sure if "_builder" addas a lot of value. Would just seastar::io_request be OK? Nit: Could the io_request_builder constructors be private and only expose static methods make_read(), make_readv, make_fdatasync, etc? The advantage being that each of the make_* takes only the relevant fields. Two other cases of having len and builder._size. How about on_internal_error instead? > +void io_request_builder::prepare_iocb(iocb& iocb) { How about returning an iocb instead? This is a fully covered switch. The trick to avoid the extra default and be safe is to write it as switch (_op) { case...: case...: } builtin_unreachable(); If any case is missing, gcc produces a warning which fails the build. The builtin_unreachable() after the switch is not needed is this case, but in general it is there to avoid gcc warning about not returning in all paths. Yes, gcc is good at doublethink. Another fully covered switch. Cheers, Rafael |
| Re: [seastar-dev] [PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O | Rafael Avila de Espindola | 22/01/20 01:16 م | Glauber Costa <gla...@scylladb.com> writes:Thanks! I am super excited to see how that performs! Cheers, Rafael |
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Botond Dénes | 22/01/20 10:39 م | If `_op` can ever have a corrupt value, `std::abort()` is better here.
|
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Rafael Avila de Espindola | 22/01/20 10:48 م | Is that really worth protecting against? I was never that lucky with corrupted memory. Cheers, Rafael |
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Botond Dénes | 22/01/20 10:56 م | I don't know, maybe not worth it, depends on how paranoid you are. We
have examples of both in our code base. > > Cheers, > Rafael |
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Glauber Costa | 23/01/20 05:33 ص |
No. In the case of an iovec, len is the amount of entries in the vector. The len argument in queue_request is the total size of the operation (in the case of an iovec, the sum of all iov_len) for the purposes of accounting for the I/O Scheduler.
Already changed it as requested by avi (to seastar::internal::io_request)
that sounds like a good direction
they are the same for non-vectored I/O, but different for vectored I/O.
sorry, not sure what exactly do you mean
Why ? I can't avoid passing the iocb, because the iocb is received from a pool that is external to this structure.
I don't want to avoid the extra default. In my view, it is there to protect us against future code, where someone added an opcode and forgot to add the relevant switch. If there was a way to statically warn about it and make sure it was a fully covered switch at all times, then I'd be happy to remove the default.
Interesting, I did not know that. In that case I most definitely *don't* want the builtin unreachable and can remove the default.
|
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Rafael Avila de Espindola | 23/01/20 08:04 ص | Glauber Costa <gla...@scylladb.com> writes:Instead of } else { throw ... } Use } else { on_internal_error(...); } This makes it abort if abort_on_internal_error is set, which is true for tests. Simpler code: switch (_op) { case io_request_builder::operation::fdatasync: return make_fdsync_iocb(_fd); case ...: ... The change would be - prepare_io(io); + io = req.prepare_iocb(); No? They can't forget. GCC will fail the build because of -Werror. That is exactly what it does. A reduced example: enum class foo { bar, zed }; int func(foo x) { switch (x) { case foo::bar: return 1; case foo::zed: return 2; } __builtin_unreachable(); } This compiles fine with -Wall. If you add 'baz' no the enum but not the switch you get: test.cc:4:12: warning: enumeration value ‘baz’ not handled in switch [-Wswitch] In this case you don't need it, but in general you do because it is GCC. If you remove it from the reduced example above you get test.cc:10:1: warning: control reaches end of non-void function [-Wreturn-type] Which is wrong, since the switch covers all paths. Cheers, Rafael |
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Glauber Costa | 23/01/20 08:08 ص | No. It would be some variation of io = req.prepare_iocb(io); iocbs come from a poll that is private from another data structure so they have to be passed on to this function Thanks.
|
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Avi Kivity | 23/01/20 11:00 ص | In any case, the patch moves an interface and should defer a
refactoring until later. I do think a refactoring is merited (by
moving all iocb related code to a helper function outside
io_request), but we can do that after we see the requirements from
io_uring.
|
| Re: [seastar-dev] [PATCH 1/6] core: remove iocb references from I/O dispatchers | Rafael Avila de Espindola | 23/01/20 12:24 م | Avi Kivity <a...@scylladb.com> writes:
>> It would be some variation of io = req.prepare_iocb(io); >> >> iocbs come from a poll that is private from another data structure so >> they have to be passed on to this function >> >> > > In any case, the patch moves an interface and should defer a refactoring > until later. I do think a refactoring is merited (by moving all iocb > related code to a helper function outside io_request), but we can do > that after we see the requirements from io_uring. I agree and should have been more explicit. I am OK with the patches landing and was pointing out only possible future improvements. Cheers, Rafael |