[PATCH 0/6] io_uring groundwork: make seastar agnostic of linux async I/O

71 views
Skip to first unread message

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:09 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
This series provides the groundwork for moving our I/O subsystem towards
io_uring. I am not including any io_uring patches yet, and instead
decided to collect early feedback on the general approach.

The main seastar-specific problem we face is that we make assumptions
about iocb, io_event and the presence of io_submit/io_getevents pretty
much everywhere.

The approach I have chosen to take is that all of that machinery is
moved inside the already existing reactor_backend infrastructure.
io_uring, when implemented, will be a new reactor backend capable of
replacing all the polling and storage I/O at once.

Comments welcome

Glauber Costa (6):
core: remove iocb references from I/O dispatchers
core: remove io_event mentions from upper layer interfaces
core: unify I/O pollers
reactor: remove semaphore from iocb acquisition
io_queue: return number of requests dispatched.
core: move storage aio to reactor backend

include/seastar/core/fair_queue.hh | 3 +-
include/seastar/core/internal/pollable_fd.hh | 1 +
include/seastar/core/io_queue.hh | 11 +-
.../seastar/core/io_request_builder.hh | 40 ++-
include/seastar/core/reactor.hh | 33 +--
src/core/io_desc.hh | 8 +-
src/core/reactor_backend.hh | 57 +++++
src/core/fair_queue.cc | 5 +-
src/core/file.cc | 32 +--
src/core/io_queue.cc | 23 +-
src/core/reactor.cc | 240 +++++-------------
src/core/reactor_backend.cc | 215 +++++++++++++++-
12 files changed, 400 insertions(+), 268 deletions(-)
copy src/core/io_desc.hh => include/seastar/core/io_request_builder.hh (50%)

--
2.20.1

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:11 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
We currently work with exposing an io_event, the return value of
the Linux async API at layers as high as read/write_dma. There are
now other mechanisms to initiate Async I/O in Linux like io_uring
that works with different structures.

This patch changes the interfaces so we work with size_t types for
as long as possible, and leave io_event as an implementation detail
for the I/O subsystem.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
include/seastar/core/io_queue.hh | 2 +-
include/seastar/core/reactor.hh | 4 ++--
src/core/io_desc.hh | 8 ++++----
src/core/file.cc | 20 ++++----------------
src/core/io_queue.cc | 6 +++---
src/core/reactor.cc | 15 +++++++++------
6 files changed, 23 insertions(+), 32 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index d3c002ae..e092ced8 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -110,7 +110,7 @@ class io_queue {
io_queue(config cfg);
~io_queue();

- future<internal::linux_abi::io_event>
+ future<size_t>
queue_request(const io_priority_class& pc, size_t len, io_request_builder req);

size_t capacity() const {
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 70d45568..84ff9a98 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -530,11 +530,11 @@ class reactor {
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
void submit_io(io_desc* desc, io_request_builder builder);
- future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq,
+ future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
io_request_builder builder);
- future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq,
+ future<size_t> submit_io_write(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
io_request_builder builder);
diff --git a/src/core/io_desc.hh b/src/core/io_desc.hh
index 1a980e92..af3225b6 100644
--- a/src/core/io_desc.hh
+++ b/src/core/io_desc.hh
@@ -27,18 +27,18 @@
namespace seastar {

class io_desc {
- promise<seastar::internal::linux_abi::io_event> _pr;
+ promise<size_t> _pr;
public:
virtual ~io_desc() = default;
virtual void set_exception(std::exception_ptr eptr) {
_pr.set_exception(std::move(eptr));
}

- virtual void set_value(seastar::internal::linux_abi::io_event& ev) {
- _pr.set_value(ev);
+ virtual void set_value(size_t res) {
+ _pr.set_value(res);
}

- future<seastar::internal::linux_abi::io_event> get_future() {
+ future<size_t> get_future() {
return _pr.get_future();
}
};
diff --git a/src/core/file.cc b/src/core/file.cc
index c2cdb35a..4de8ab16 100644
--- a/src/core/file.cc
+++ b/src/core/file.cc
@@ -322,10 +322,7 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex
future<size_t>
posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& io_priority_class) {
io_request_builder req(io_request_builder::operation::write, _fd, pos, const_cast<void*>(buffer), len);
- return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) {
- engine().handle_io_result(ev);
- return make_ready_future<size_t>(size_t(ev.res));
- });
+ return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req));
}

future<size_t>
@@ -334,19 +331,13 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priori
auto size = iov.size();
auto data = iov.data();
io_request_builder req(io_request_builder::operation::writev, _fd, pos, data, size);
- return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) {
- engine().handle_io_result(ev);
- return make_ready_future<size_t>(size_t(ev.res));
- });
+ return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
}

future<size_t>
posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& io_priority_class) {
io_request_builder req(io_request_builder::operation::read, _fd, pos, buffer, len);
- return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) {
- engine().handle_io_result(ev);
- return make_ready_future<size_t>(size_t(ev.res));
- });
+ return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req));
}

future<size_t>
@@ -355,10 +346,7 @@ posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priorit
auto size = iov.size();
auto data = iov.data();
io_request_builder req(io_request_builder::operation::readv, _fd, pos, data, size);
- return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) {
- engine().handle_io_result(ev);
- return make_ready_future<size_t>(size_t(ev.res));
- });
+ return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
}

future<temporary_buffer<uint8_t>>
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 3369d9f4..b8124a8c 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -61,9 +61,9 @@ class io_desc_read_write final : public io_desc {
io_desc::set_exception(std::move(eptr));
}

- void set_value(io_event& ev) {
+ void set_value(size_t ret) {
notify_requests_finished();
- io_desc::set_value(ev);
+ io_desc::set_value(ret);
}
};

@@ -223,7 +223,7 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
return *_priority_classes[owner][id];
}

-future<io_event>
+future<size_t>
io_queue::queue_request(const io_priority_class& pc, size_t len, io_request_builder req) {
auto start = std::chrono::steady_clock::now();
return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 226017f0..4d668c7a 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1571,14 +1571,14 @@ const io_priority_class& default_priority_class() {
return shard_default_class;
}

-future<io_event>
+future<size_t>
reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) {
++_io_stats.aio_reads;
_io_stats.aio_read_bytes += len;
return ioq->queue_request(pc, len, std::move(req));
}

-future<io_event>
+future<size_t>
reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) {
++_io_stats.aio_writes;
_io_stats.aio_write_bytes += len;
@@ -1605,7 +1605,12 @@ bool reactor::process_io()
}
_iocb_pool.put_one(iocb);
auto desc = reinterpret_cast<io_desc*>(ev[i].data);
- desc->set_value(ev[i]);
+ try {
+ this->handle_io_result(ev[i]);
+ desc->set_value(size_t(ev[i].res));
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
delete desc;
}
return n;
@@ -1913,9 +1918,7 @@ reactor::fdatasync(int fd) {

io_request_builder req(io_request_builder::operation::fdatasync, fd);
submit_io(desc.release(), std::move(req));
- return fut.then([] (linux_abi::io_event event) {
- throw_kernel_error(event.res);
- });
+ return fut.discard_result();
} catch (...) {
return make_exception_future<>(std::current_exception());
}
--
2.20.1

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:11 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
Our async I/O mechanism relies on Linux async I/O interface for now,
which in turn relies on sending iocbs down to the Kernel. Because iocbs
are so ingrained in the interface, they go all the way up and the
write_dma and read_dma functions are already aware of it.

However, there are now other interfaces, like io_uring, that rely on
different data structures. To account for that, we will encapsulate the
I/O information like file descriptor, addresses and sizes into a seastar
structure and only transform this into iocbs down at the lowest level,
where we already know which interface to use.

In a follow up patch, we will have to do the same thing with the
io_event structure.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
include/seastar/core/io_queue.hh | 5 +-
include/seastar/core/io_request_builder.hh | 58 +++++++++++++++++++++
include/seastar/core/reactor.hh | 8 +--
src/core/file.cc | 20 +++-----
src/core/io_queue.cc | 17 ++++---
src/core/reactor.cc | 59 ++++++++++++++++++----
6 files changed, 132 insertions(+), 35 deletions(-)
create mode 100644 include/seastar/core/io_request_builder.hh

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index e1998073..d3c002ae 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -26,6 +26,7 @@
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
+#include <seastar/core/io_request_builder.hh>
#include <mutex>
#include <array>

@@ -85,8 +86,6 @@ class io_queue {
priority_class_data& find_or_create_class(const io_priority_class& pc, shard_id owner);
friend class smp;
public:
- enum class request_type { read, write };
-
// We want to represent the fact that write requests are (maybe) more expensive
// than read requests. To avoid dealing with floating point math we will scale one
// read request to be counted by this amount.
@@ -112,7 +111,7 @@ class io_queue {
~io_queue();

future<internal::linux_abi::io_event>
- queue_request(const io_priority_class& pc, size_t len, request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> do_io);
+ queue_request(const io_priority_class& pc, size_t len, io_request_builder req);

size_t capacity() const {
return _config.capacity;
diff --git a/include/seastar/core/io_request_builder.hh b/include/seastar/core/io_request_builder.hh
new file mode 100644
index 00000000..d82e3b7e
--- /dev/null
+++ b/include/seastar/core/io_request_builder.hh
@@ -0,0 +1,58 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2020 ScyllaDB
+ */
+
+#pragma once
+
+#include <seastar/core/sstring.hh>
+#include <seastar/core/linux-aio.hh>
+
+namespace seastar {
+class io_request_builder {
+public:
+ enum class operation { read, readv, write, writev, fdatasync };
+ operation _op;
+ int _fd;
+ uint64_t _pos;
+ void *_address;
+ size_t _size;
+public:
+ io_request_builder(operation op, int fd, uint64_t pos, void* address, size_t size)
+ : _op(op)
+ , _fd(fd)
+ , _pos(pos)
+ , _address(address)
+ , _size(size)
+ {}
+ io_request_builder(operation op, int fd) : io_request_builder(op, fd, 0, nullptr, 0) {}
+
+ bool is_read() const {
+ return ((_op == operation::read) || (_op == operation::readv));
+ }
+
+ bool is_write() const {
+ return ((_op == operation::write) || (_op == operation::writev));
+ }
+
+ sstring operation() const;
+
+ void prepare_iocb(internal::linux_abi::iocb& iocb);
+};
+}
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 13070d7d..70d45568 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -80,6 +80,7 @@
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/smp.hh>
+#include <seastar/core/io_request_builder.hh>
#include "internal/pollable_fd.hh"
#include "internal/poll.hh"

@@ -528,16 +529,15 @@ class reactor {
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(io_desc* desc,
- noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
+ void submit_io(io_desc* desc, io_request_builder builder);
future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
- noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
+ io_request_builder builder);
future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
- noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
+ io_request_builder builder);

inline void handle_io_result(const internal::linux_abi::io_event& ev) {
auto res = long(ev.res);
diff --git a/src/core/file.cc b/src/core/file.cc
index 5fa556b8..c2cdb35a 100644
--- a/src/core/file.cc
+++ b/src/core/file.cc
@@ -321,9 +321,8 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex

future<size_t>
posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& io_priority_class) {
- return engine().submit_io_write(_io_queue, io_priority_class, len, [fd = _fd, pos, buffer, len] (iocb& io) {
- io = make_write_iocb(fd, pos, const_cast<void*>(buffer), len);
- }).then([] (io_event ev) {
+ io_request_builder req(io_request_builder::operation::write, _fd, pos, const_cast<void*>(buffer), len);
+ return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) {
engine().handle_io_result(ev);
return make_ready_future<size_t>(size_t(ev.res));
});
@@ -334,9 +333,8 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priori
auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment);
auto size = iov.size();
auto data = iov.data();
- return engine().submit_io_write(_io_queue, io_priority_class, len, [fd = _fd, pos, data, size] (iocb& io) {
- io = make_writev_iocb(fd, pos, data, size);
- }).then([iov = std::move(iov)] (io_event ev) {
+ io_request_builder req(io_request_builder::operation::writev, _fd, pos, data, size);
+ return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) {
engine().handle_io_result(ev);
return make_ready_future<size_t>(size_t(ev.res));
});
@@ -344,9 +342,8 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priori

future<size_t>
posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& io_priority_class) {
- return engine().submit_io_read(_io_queue, io_priority_class, len, [fd = _fd, pos, buffer, len] (iocb& io) {
- io = make_read_iocb(fd, pos, buffer, len);
- }).then([] (io_event ev) {
+ io_request_builder req(io_request_builder::operation::read, _fd, pos, buffer, len);
+ return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([] (io_event ev) {
engine().handle_io_result(ev);
return make_ready_future<size_t>(size_t(ev.res));
});
@@ -357,9 +354,8 @@ posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priorit
auto len = internal::sanitize_iovecs(iov, _disk_read_dma_alignment);
auto size = iov.size();
auto data = iov.data();
- return engine().submit_io_read(_io_queue, io_priority_class, len, [fd = _fd, pos, data, size] (iocb& io) {
- io = make_readv_iocb(fd, pos, data, size);
- }).then([iov = std::move(iov)] (io_event ev) {
+ io_request_builder req(io_request_builder::operation::readv, _fd, pos, data, size);
+ return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).then([iov = std::move(iov)] (io_event ev) {
engine().handle_io_result(ev);
return make_ready_future<size_t>(size_t(ev.res));
});
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index db9c5b0d..3369d9f4 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -31,6 +31,8 @@
#include <mutex>
#include <array>
#include "core/io_desc.hh"
+#include <fmt/format.h>
+#include <fmt/ostream.h>

namespace seastar {

@@ -222,32 +224,35 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
}

future<io_event>
-io_queue::queue_request(const io_priority_class& pc, size_t len, io_queue::request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) {
+io_queue::queue_request(const io_priority_class& pc, size_t len, io_request_builder req) {
auto start = std::chrono::steady_clock::now();
- return smp::submit_to(coordinator(), [start, &pc, len, req_type, prepare_io = std::move(prepare_io), owner = engine().cpu_id(), this] () mutable {
+ return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable {
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
pclass.nr_queued++;
unsigned weight;
size_t size;
- if (req_type == io_queue::request_type::write) {
+ if (req.is_write()) {
weight = _config.disk_req_write_to_read_multiplier;
size = _config.disk_bytes_write_to_read_multiplier * len;
- } else {
+ } else if (req.is_read()) {
weight = io_queue::read_request_base_count;
size = io_queue::read_request_base_count * len;
+ } else {
+ throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {} ", req.operation()));
}
+
auto desc = std::make_unique<io_desc_read_write>(this, weight, size);
auto fq_desc = desc->fq_descriptor();
auto fut = desc->get_future();
- _fq.queue(pclass.ptr, std::move(fq_desc), [&pclass, start, prepare_io = std::move(prepare_io), desc = std::move(desc), len, this] () mutable noexcept {
+ _fq.queue(pclass.ptr, std::move(fq_desc), [&pclass, start, req = std::move(req), desc = std::move(desc), len, this] () mutable noexcept {
try {
pclass.nr_queued--;
pclass.ops++;
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(desc.get(), std::move(prepare_io));
+ engine().submit_io(desc.get(), std::move(req));
desc.release();
} catch (...) {
desc->set_exception(std::current_exception());
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index ee16f109..226017f0 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1435,13 +1435,52 @@ reactor::connect(socket_address sa, socket_address local, transport proto) {
return _network_stack->connect(sa, local, proto);
}

+void io_request_builder::prepare_iocb(iocb& iocb) {
+ switch (_op) {
+ case io_request_builder::operation::fdatasync:
+ iocb = make_fdsync_iocb(_fd);
+ break;
+ case io_request_builder::operation::write:
+ iocb = make_write_iocb(_fd, _pos, _address, _size);
+ break;
+ case io_request_builder::operation::writev:
+ iocb = make_writev_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
+ break;
+ case io_request_builder::operation::read:
+ iocb = make_read_iocb(_fd, _pos, _address, _size);
+ break;
+ case io_request_builder::operation::readv:
+ iocb = make_readv_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
+ break;
+ default:
+ throw std::runtime_error("Can't prepare IOCB for unrecognized operation");
+ }
+}
+
+sstring io_request_builder::operation() const {
+ switch (_op) {
+ case io_request_builder::operation::fdatasync:
+ return "fdatasync";
+ case io_request_builder::operation::write:
+ return "write";
+ case io_request_builder::operation::writev:
+ return "vectored write";
+ case io_request_builder::operation::read:
+ return "read";
+ case io_request_builder::operation::readv:
+ return "vectored read";
+ default:
+ return fmt::format("unknown operation {}", unsigned(_op));
+ }
+}
+
void
-reactor::submit_io(io_desc* desc, noncopyable_function<void (linux_abi::iocb&)> prepare_io) {
+reactor::submit_io(io_desc* desc, io_request_builder req) {
// We can ignore the future returned here, because the submitted aio will be polled
// for and completed in process_io().
- (void)_iocb_pool.get_one().then([this, desc, prepare_io = std::move(prepare_io)] (linux_abi::iocb* iocb) mutable {
+ (void)_iocb_pool.get_one().then([this, desc, req = std::move(req)] (linux_abi::iocb* iocb) mutable {
auto& io = *iocb;
- prepare_io(io);
+ req.prepare_iocb(io);
if (_aio_eventfd) {
set_eventfd_notification(io, _aio_eventfd->get_fd());
}
@@ -1533,17 +1572,17 @@ const io_priority_class& default_priority_class() {
}

future<io_event>
-reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) {
+reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) {
++_io_stats.aio_reads;
_io_stats.aio_read_bytes += len;
- return ioq->queue_request(pc, len, io_queue::request_type::read, std::move(prepare_io));
+ return ioq->queue_request(pc, len, std::move(req));
}

future<io_event>
-reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) {
+reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request_builder req) {
++_io_stats.aio_writes;
_io_stats.aio_write_bytes += len;
- return ioq->queue_request(pc, len, io_queue::request_type::write, std::move(prepare_io));
+ return ioq->queue_request(pc, len, std::move(req));
}

bool reactor::process_io()
@@ -1871,9 +1910,9 @@ reactor::fdatasync(int fd) {
try {
auto desc = std::make_unique<io_desc>();
auto fut = desc->get_future();
- submit_io(desc.release(), [fd] (linux_abi::iocb& iocb) {
- iocb = make_fdsync_iocb(fd);
- });
+
+ io_request_builder req(io_request_builder::operation::fdatasync, fd);
+ submit_io(desc.release(), std::move(req));
return fut.then([] (linux_abi::io_event event) {
throw_kernel_error(event.res);
});
--
2.20.1

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:13 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
We currently have two different pollers for I/O: one for
submitting I/O, and another for fetching the results of
a previous submission.

There is no particular reason to have this separate at this
stage, and the reasons for a two-poller system seems mostly
historical to me (at some point, we could submit I/O outside
of polling context)

One of the pollers (getevents) is available - as far as ifdefs
are concerned - on OSv, but I believe that even if this is not
an overlook, if we haven't dispatched anything we'll hardly have
anything to read so I am just ignoring that.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
include/seastar/core/reactor.hh | 2 --
src/core/reactor.cc | 22 +---------------------
2 files changed, 1 insertion(+), 23 deletions(-)

diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 84ff9a98..8f3f84de 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -160,7 +160,6 @@ class reactor {

class io_pollfn;
class signal_pollfn;
- class aio_batch_submit_pollfn;
class batch_flush_pollfn;
class smp_pollfn;
class drain_cross_cpu_freelist_pollfn;
@@ -171,7 +170,6 @@ class reactor {
class execution_stage_pollfn;
friend io_pollfn;
friend signal_pollfn;
- friend aio_batch_submit_pollfn;
friend batch_flush_pollfn;
friend smp_pollfn;
friend drain_cross_cpu_freelist_pollfn;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 4d668c7a..8cdee5b7 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2254,7 +2254,7 @@ class reactor::io_pollfn final : public reactor::pollfn {
public:
io_pollfn(reactor& r) : _r(r) {}
virtual bool poll() override final {
- return _r.process_io();
+ return _r.process_io() | _r.flush_pending_aio();
}
virtual bool pure_poll() override final {
return poll(); // actually performs work, but triggers no user continuations, so okay
@@ -2322,25 +2322,6 @@ class reactor::batch_flush_pollfn final : public reactor::pollfn {
}
};

-class reactor::aio_batch_submit_pollfn final : public reactor::pollfn {
- reactor& _r;
-public:
- aio_batch_submit_pollfn(reactor& r) : _r(r) {}
- virtual bool poll() final override {
- return _r.flush_pending_aio();
- }
- virtual bool pure_poll() override final {
- return poll(); // actually performs work, but triggers no user continuations, so okay
- }
- virtual bool try_enter_interrupt_mode() override {
- // This is a passive poller, so if a previous poll
- // returned false (idle), there's no more work to do.
- return true;
- }
- virtual void exit_interrupt_mode() override final {
- }
-};
-
class reactor::drain_cross_cpu_freelist_pollfn final : public reactor::pollfn {
public:
virtual bool poll() final override {
@@ -2651,7 +2632,6 @@ int reactor::run() {
#ifndef HAVE_OSV
io_poller = poller(std::make_unique<io_pollfn>(*this));
#endif
- poller aio_poller(std::make_unique<aio_batch_submit_pollfn>(*this));

poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));
poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());
--
2.20.1

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:14 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
We currently protect the iocb acquisition path in the storage layer with
a semaphore. This is a relic from a past in which we didn't have an I/O
scheduler.

In reality, access to this structure has been protected by the I/O
scheduler for a long time, and we are guaranteed to never run out of
iocbs in the poll.

Removing the semaphore allow us to un-futurize this, which is a benefit
in itself. However as it will become clear in the next patch, the
biggest advantage of this is that we will be able to unify this code
with the reactor backend.

In preparation for that we will also add a test for the fact that we
really indeed have enough entries to draw from, despite the I/O scheduler
guarantees.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
include/seastar/core/reactor.hh | 3 +--
src/core/reactor.cc | 37 +++++++++++++++------------------
2 files changed, 18 insertions(+), 22 deletions(-)

diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 8f3f84de..5e6d1a5e 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -248,11 +248,10 @@ class reactor {

class iocb_pool {
alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
- semaphore _sem{0};
std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
public:
iocb_pool();
- future<internal::linux_abi::iocb*> get_one();
+ internal::linux_abi::iocb* get_one();
void put_one(internal::linux_abi::iocb* io);
unsigned outstanding() const;
};
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 8cdee5b7..5d810f9d 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -168,24 +168,22 @@ reactor::iocb_pool::iocb_pool() {
for (unsigned i = 0; i != max_aio; ++i) {
_free_iocbs.push(&_iocb_pool[i]);
}
- _sem.signal(max_aio);
}

inline
-future<internal::linux_abi::iocb*>
+internal::linux_abi::iocb*
reactor::iocb_pool::get_one() {
- return _sem.wait(1).then([this] {
- auto io = _free_iocbs.top();
- _free_iocbs.pop();
- return io;
- });
+ auto r = _free_iocbs.size();
+ assert(r > 0);
+ auto io = _free_iocbs.top();
+ _free_iocbs.pop();
+ return io;
}

inline
void
reactor::iocb_pool::put_one(internal::linux_abi::iocb* io) {
_free_iocbs.push(io);
- _sem.signal(1);
}

inline
@@ -1478,18 +1476,17 @@ void
reactor::submit_io(io_desc* desc, io_request_builder req) {
// We can ignore the future returned here, because the submitted aio will be polled
// for and completed in process_io().
- (void)_iocb_pool.get_one().then([this, desc, req = std::move(req)] (linux_abi::iocb* iocb) mutable {
- auto& io = *iocb;
- req.prepare_iocb(io);
- if (_aio_eventfd) {
- set_eventfd_notification(io, _aio_eventfd->get_fd());
- }
- if (aio_nowait_supported) {
- set_nowait(io, true);
- }
- set_user_data(io, desc);
- _pending_aio.push_back(&io);
- });
+ auto* iocb = _iocb_pool.get_one();
+ auto& io = *iocb;
+ req.prepare_iocb(io);
+ if (_aio_eventfd) {
+ set_eventfd_notification(io, _aio_eventfd->get_fd());
+ }
+ if (aio_nowait_supported) {
+ set_nowait(io, true);
+ }
+ set_user_data(io, desc);
+ _pending_aio.push_back(&io);
}

// Returns: number of iocbs consumed (0 or 1)
--
2.20.1

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:16 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
Right now we ignore the number of requests dispatched by the I/O queue.

Dispatching happens inside a seastar poller, but that poller does other
things as well and that is enough to determine whether or not the poller
did work.

I am breaking this poller apart in two, with the I/O queue being polled
separately from the OS-level work in preparation for io_uring support.
When that happens, it will be necessary to know whether the poller did
work.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
include/seastar/core/fair_queue.hh | 3 ++-
include/seastar/core/io_queue.hh | 4 ++--
src/core/fair_queue.cc | 5 ++++-
src/core/reactor.cc | 5 +++--
4 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh
index f05f713d..cd5794dd 100644
--- a/include/seastar/core/fair_queue.hh
+++ b/include/seastar/core/fair_queue.hh
@@ -183,7 +183,8 @@ class fair_queue {
void notify_requests_finished(fair_queue_request_descriptor& desc);

/// Try to execute new requests if there is capacity left in the queue.
- void dispatch_requests();
+ /// \return the number of requests dispatched.
+ size_t dispatch_requests();

/// Updates the current shares of this priority class
///
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index e092ced8..813c0fda 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -132,8 +132,8 @@ class io_queue {
}

// Dispatch requests that are pending in the I/O queue
- void poll_io_queue() {
- _fq.dispatch_requests();
+ size_t poll_io_queue() {
+ return _fq.dispatch_requests();
}

sstring mountpoint() const {
diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc
index e760304d..e46cd0b5 100644
--- a/src/core/fair_queue.cc
+++ b/src/core/fair_queue.cc
@@ -102,7 +102,8 @@ void fair_queue::notify_requests_finished(fair_queue_request_descriptor& desc) {
}


-void fair_queue::dispatch_requests() {
+size_t fair_queue::dispatch_requests() {
+ size_t dispatched = 0;
while (can_dispatch()) {
priority_class_ptr h;
do {
@@ -115,6 +116,7 @@ void fair_queue::dispatch_requests() {
_req_count_executing += req.desc.weight;
_bytes_count_executing += req.desc.size;
_requests_queued--;
+ dispatched++;

auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
auto req_cost = (float(req.desc.weight) / _config.max_req_count + float(req.desc.size) / _config.max_bytes_count) / h->_shares;
@@ -134,6 +136,7 @@ void fair_queue::dispatch_requests() {
}
req.func();
}
+ return dispatched;
}

void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 5d810f9d..dc07ab2d 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1517,11 +1517,12 @@ reactor::handle_aio_error(linux_abi::iocb* iocb, int ec) {

bool
reactor::flush_pending_aio() {
+ bool did_work = false;
+
for (auto& ioq : my_io_queues) {
- ioq->poll_io_queue();
+ did_work |= ioq->poll_io_queue();
}

- bool did_work = false;
while (!_pending_aio.empty()) {
auto nr = _pending_aio.size();
auto iocbs = _pending_aio.data();
--
2.20.1

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 10:36:18 AM1/22/20
to seastar-dev@googlegroups.com, Glauber Costa
Right now the storage subsystem is placed inside the reactor. This
works because we only ever do I/O using the linux io_submit/io_getevents
interface.

Linux now has a new, improved interface to do I/O (io_uring) that
dispatches I/O by means of a different set of system calls with a
different set of data structures.

One possible approach would be to do something similar to the
reactor_backend model, as an io_backend.

However io_uring is the most delicious piece of awesomeness I've
seen in a long while and it does much more than just I/O: the interface
can do pretty much anything, which means we should be able to replace
the other backend operations with it too.

Therefore this patch lays the ground for that to happen by moving the
storage functionality inside the reactor backend.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
include/seastar/core/internal/pollable_fd.hh | 1 +
include/seastar/core/reactor.hh | 18 +-
src/core/reactor_backend.hh | 57 +++++
src/core/reactor.cc | 156 +-------------
src/core/reactor_backend.cc | 215 ++++++++++++++++++-
5 files changed, 279 insertions(+), 168 deletions(-)

diff --git a/include/seastar/core/internal/pollable_fd.hh b/include/seastar/core/internal/pollable_fd.hh
index 20cbad2f..d8476151 100644
--- a/include/seastar/core/internal/pollable_fd.hh
+++ b/include/seastar/core/internal/pollable_fd.hh
@@ -151,6 +151,7 @@ class pollable_fd {
friend class reactor;
friend class readable_eventfd;
friend class writeable_eventfd;
+ friend class aio_storage_context;
private:
std::unique_ptr<pollable_fd_state> _s;
};
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 5e6d1a5e..0801a75b 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -182,6 +182,7 @@ class reactor {
friend class internal::reactor_stall_sampler;
friend class reactor_backend_epoll;
friend class reactor_backend_aio;
+ friend class aio_storage_context;
public:
class poller {
std::unique_ptr<pollfn> _pollfn;
@@ -242,20 +243,8 @@ class reactor {

static constexpr unsigned max_aio_per_queue = 128;
static constexpr unsigned max_queues = 8;
- static constexpr unsigned max_aio = max_aio_per_queue * max_queues;
friend disk_config_params;

-
- class iocb_pool {
- alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
- std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
- public:
- iocb_pool();
- internal::linux_abi::iocb* get_one();
- void put_one(internal::linux_abi::iocb* io);
- unsigned outstanding() const;
- };
-
// Not all reactors have IO queues. If the number of IO queues is less than the number of shards,
// some reactors will talk to foreign io_queues. If this reactor holds a valid IO queue, it will
// be stored here.
@@ -286,10 +275,6 @@ class reactor {
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
- internal::linux_abi::aio_context_t _io_context;
- iocb_pool _iocb_pool;
- boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio;
- boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
io_stats _io_stats;
uint64_t _fsyncs = 0;
uint64_t _cxx_exceptions = 0;
@@ -362,7 +347,6 @@ class reactor {
static std::chrono::nanoseconds calculate_poll_time();
static void block_notifier(int);
void wakeup();
- size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
bool flush_pending_aio();
bool flush_tcp_batches();
bool do_expire_lowres_timers();
diff --git a/src/core/reactor_backend.hh b/src/core/reactor_backend.hh
index 2cdc260d..533c329c 100644
--- a/src/core/reactor_backend.hh
+++ b/src/core/reactor_backend.hh
@@ -26,12 +26,15 @@
#include <seastar/core/internal/pollable_fd.hh>
#include <seastar/core/internal/poll.hh>
#include <seastar/core/linux-aio.hh>
+#include <seastar/core/cacheline.hh>
+#include <seastar/core/io_request_builder.hh>
#include <sys/time.h>
#include <signal.h>
#include <thread>
#include <stack>
#include <boost/any.hpp>
#include <boost/program_options.hpp>
+#include <boost/container/static_vector.hpp>

#ifdef HAVE_OSV
#include <osv/newpoll.hh>
@@ -40,6 +43,46 @@
namespace seastar {

class reactor;
+class io_desc;
+
+class aio_storage_context {
+public:
+ static constexpr unsigned max_aio = 1024;
+
+ class aio_storage_context_pollfn : public seastar::pollfn {
+ aio_storage_context* _ctx;
+ public:
+ virtual bool poll() override;
+ virtual bool pure_poll() override;
+ virtual bool try_enter_interrupt_mode() override;
+ virtual void exit_interrupt_mode() override;
+ explicit aio_storage_context_pollfn(aio_storage_context* ctx) : _ctx(ctx) {}
+ };
+
+ explicit aio_storage_context(reactor *r);
+ ~aio_storage_context();
+
+ void submit_io(io_desc* desc, io_request_builder builder);
+
+ std::unique_ptr<seastar::pollfn> create_poller();
+private:
+ reactor* _r;
+ internal::linux_abi::aio_context_t _io_context;
+
+ size_t handle_aio_error(internal::linux_abi::iocb* iocb, int ec);
+ bool flush_pending_aio();
+ bool process_io();
+
+ alignas(cache_line_size) std::array<internal::linux_abi::iocb, max_aio> _iocb_pool;
+ std::stack<internal::linux_abi::iocb*, boost::container::static_vector<internal::linux_abi::iocb*, max_aio>> _free_iocbs;
+
+ internal::linux_abi::iocb* get_one();
+ void put_one(internal::linux_abi::iocb* io);
+ unsigned outstanding() const;
+
+ boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio;
+ boost::container::static_vector<internal::linux_abi::iocb*, max_aio> _pending_aio_retry;
+};

// The "reactor_backend" interface provides a method of waiting for various
// basic events on one thread. We have one implementation based on epoll and
@@ -68,6 +111,9 @@ class reactor_backend {
virtual void reset_preemption_monitor() = 0;
virtual void request_preemption() = 0;
virtual void start_handling_signal() = 0;
+
+ virtual std::unique_ptr<pollfn> create_storage_poller() = 0;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) = 0;
};

// reactor backend using file-descriptor & epoll, suitable for running on
@@ -84,6 +130,7 @@ class reactor_backend_epoll : public reactor_backend {
promise<> pollable_fd_state::* pr, int event);
void complete_epoll_event(pollable_fd_state& fd,
promise<> pollable_fd_state::* pr, int events, int event);
+ aio_storage_context _aio_storage_context;
public:
explicit reactor_backend_epoll(reactor* r);
virtual ~reactor_backend_epoll() override;
@@ -99,6 +146,9 @@ class reactor_backend_epoll : public reactor_backend {
virtual void reset_preemption_monitor() override;
virtual void request_preemption() override;
virtual void start_handling_signal() override;
+
+ virtual std::unique_ptr<pollfn> create_storage_poller() override;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) override;
};

class reactor_backend_aio : public reactor_backend {
@@ -118,6 +168,8 @@ class reactor_backend_aio : public reactor_backend {
};
context _preempting_io{2}; // Used for the timer tick and the high resolution timer
context _polling_io{max_polls}; // FIXME: unify with disk aio_context
+ aio_storage_context _aio_storage_context;
+
file_desc _steady_clock_timer = make_timerfd();
internal::linux_abi::iocb _task_quota_timer_iocb;
internal::linux_abi::iocb _timerfd_iocb;
@@ -161,6 +213,9 @@ class reactor_backend_aio : public reactor_backend {
virtual void reset_preemption_monitor() override;
virtual void request_preemption() override;
virtual void start_handling_signal() override;
+
+ virtual std::unique_ptr<pollfn> create_storage_poller() override;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) override;
};

#ifdef HAVE_OSV
@@ -181,6 +236,8 @@ class reactor_backend_osv : public reactor_backend {
virtual future<> writeable(pollable_fd_state& fd) override;
virtual void forget(pollable_fd_state& fd) override;
void enable_timer(steady_clock_type::time_point when);
+ virtual std::unique_ptr<pollfn> create_storage_poller() override;
+ virtual void submit_io(io_desc* desc, io_request_builder builder) override;
};
#endif /* HAVE_OSV */

diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index dc07ab2d..cdb71031 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -164,34 +164,6 @@ namespace seastar {
seastar::logger seastar_logger("seastar");
seastar::logger sched_logger("scheduler");

-reactor::iocb_pool::iocb_pool() {
- for (unsigned i = 0; i != max_aio; ++i) {
- _free_iocbs.push(&_iocb_pool[i]);
- }
-}
-
-inline
-internal::linux_abi::iocb*
-reactor::iocb_pool::get_one() {
- auto r = _free_iocbs.size();
- assert(r > 0);
- auto io = _free_iocbs.top();
- _free_iocbs.pop();
- return io;
-}
-
-inline
-void
-reactor::iocb_pool::put_one(internal::linux_abi::iocb* io) {
- _free_iocbs.push(io);
-}
-
-inline
-unsigned
-reactor::iocb_pool::outstanding() const {
- return max_aio - _free_iocbs.size();
-}
-
io_priority_class
reactor::register_one_priority_class(sstring name, uint32_t shares) {
return io_queue::register_one_priority_class(std::move(name), shares);
@@ -519,7 +491,7 @@ constexpr unsigned reactor::max_queues;
constexpr unsigned reactor::max_aio_per_queue;

// Broken (returns spurious EIO). Cause/fix unknown.
-static bool aio_nowait_supported = false;
+bool aio_nowait_supported = false;

static bool sched_debug() {
return false;
@@ -867,7 +839,6 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
#endif
, _cpu_started(0)
, _cpu_stall_detector(std::make_unique<cpu_stall_detector>(this))
- , _io_context(0)
, _reuseport(posix_reuseport_detect())
, _thread_pool(std::make_unique<thread_pool>(this, seastar::format("syscall-{}", id))) {
/*
@@ -883,7 +854,6 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
seastar::thread_impl::init();
_backend->start_tick();

- setup_aio_context(max_aio, &_io_context);
#ifdef HAVE_OSV
_timer_thread.start();
#else
@@ -917,7 +887,6 @@ reactor::~reactor() {
eraser(_expired_timers);
eraser(_expired_lowres_timers);
eraser(_expired_manual_timers);
- io_destroy(_io_context);
for (auto&& tq : _task_queues) {
if (tq) {
// The following line will preserve the convention that constructor and destructor functions
@@ -1474,45 +1443,7 @@ sstring io_request_builder::operation() const {

void
reactor::submit_io(io_desc* desc, io_request_builder req) {
- // We can ignore the future returned here, because the submitted aio will be polled
- // for and completed in process_io().
- auto* iocb = _iocb_pool.get_one();
- auto& io = *iocb;
- req.prepare_iocb(io);
- if (_aio_eventfd) {
- set_eventfd_notification(io, _aio_eventfd->get_fd());
- }
- if (aio_nowait_supported) {
- set_nowait(io, true);
- }
- set_user_data(io, desc);
- _pending_aio.push_back(&io);
-}
-
-// Returns: number of iocbs consumed (0 or 1)
-size_t
-reactor::handle_aio_error(linux_abi::iocb* iocb, int ec) {
- switch (ec) {
- case EAGAIN:
- return 0;
- case EBADF: {
- auto desc = reinterpret_cast<io_desc*>(get_user_data(*iocb));
- _iocb_pool.put_one(iocb);
- try {
- throw std::system_error(EBADF, std::system_category());
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
- delete desc;
- // if EBADF, it means that the first request has a bad fd, so
- // we will only remove it from _pending_aio and try again.
- return 1;
- }
- default:
- ++_io_stats.aio_errors;
- throw_system_error_on(true, "io_submit");
- abort();
- }
+ _backend->submit_io(desc, std::move(req));
}

bool
@@ -1523,42 +1454,6 @@ reactor::flush_pending_aio() {
did_work |= ioq->poll_io_queue();
}

- while (!_pending_aio.empty()) {
- auto nr = _pending_aio.size();
- auto iocbs = _pending_aio.data();
- auto r = io_submit(_io_context, nr, iocbs);
- size_t nr_consumed;
- if (r == -1) {
- nr_consumed = handle_aio_error(iocbs[0], errno);
- } else {
- nr_consumed = size_t(r);
- }
-
- did_work = true;
- if (nr_consumed == nr) {
- _pending_aio.clear();
- } else {
- _pending_aio.erase(_pending_aio.begin(), _pending_aio.begin() + nr_consumed);
- }
- }
- if (!_pending_aio_retry.empty()) {
- auto retries = std::exchange(_pending_aio_retry, {});
- // FIXME: future is discarded
- (void)_thread_pool->submit<syscall_result<int>>([this, retries] () mutable {
- auto r = io_submit(_io_context, retries.size(), retries.data());
- return wrap_syscall<int>(r);
- }).then([this, retries] (syscall_result<int> result) {
- auto iocbs = retries.data();
- size_t nr_consumed = 0;
- if (result.result == -1) {
- nr_consumed = handle_aio_error(iocbs[0], result.error);
- } else {
- nr_consumed = result.result;
- }
- std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry));
- });
- did_work = true;
- }
return did_work;
}

@@ -1583,37 +1478,6 @@ reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len,
return ioq->queue_request(pc, len, std::move(req));
}

-bool reactor::process_io()
-{
- io_event ev[max_aio];
- struct timespec timeout = {0, 0};
- auto n = io_getevents(_io_context, 1, max_aio, ev, &timeout, _force_io_getevents_syscall);
- if (n == -1 && errno == EINTR) {
- n = 0;
- }
- assert(n >= 0);
- unsigned nr_retry = 0;
- for (size_t i = 0; i < size_t(n); ++i) {
- auto iocb = get_iocb(ev[i]);
- if (ev[i].res == -EAGAIN) {
- ++nr_retry;
- set_nowait(*iocb, false);
- _pending_aio_retry.push_back(iocb);
- continue;
- }
- _iocb_pool.put_one(iocb);
- auto desc = reinterpret_cast<io_desc*>(ev[i].data);
- try {
- this->handle_io_result(ev[i]);
- desc->set_value(size_t(ev[i].res));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
- delete desc;
- }
- return n;
-}
-
namespace internal {

size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept {
@@ -2252,18 +2116,13 @@ class reactor::io_pollfn final : public reactor::pollfn {
public:
io_pollfn(reactor& r) : _r(r) {}
virtual bool poll() override final {
- return _r.process_io() | _r.flush_pending_aio();
+ return _r.flush_pending_aio();
}
virtual bool pure_poll() override final {
return poll(); // actually performs work, but triggers no user continuations, so okay
}
virtual bool try_enter_interrupt_mode() override {
- // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep
- // is only possible if there are no in-flight aios. If there are, we need to keep polling.
- //
- // Alternatively, if we enabled _aio_eventfd, we can always enter
- unsigned executing = _r._iocb_pool.outstanding();
- return executing == 0 || _r._aio_eventfd;
+ return true;
}
virtual void exit_interrupt_mode() override {
// nothing to do
@@ -2618,7 +2477,6 @@ int reactor::run() {

register_metrics();

- compat::optional<poller> io_poller = {};
compat::optional<poller> smp_poller = {};

// I/O Performance greatly increases if the smp poller runs before the I/O poller. This is
@@ -2627,9 +2485,9 @@ int reactor::run() {
if (smp::count > 1) {
smp_poller = poller(std::make_unique<smp_pollfn>(*this));
}
-#ifndef HAVE_OSV
- io_poller = poller(std::make_unique<io_pollfn>(*this));
-#endif
+
+ poller io_poller(std::make_unique<io_pollfn>(*this));
+ poller storage_poller(_backend->create_storage_poller());

poller batch_flush_poller(std::make_unique<batch_flush_pollfn>(*this));
poller execution_stage_poller(std::make_unique<execution_stage_pollfn>());
diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc
index a798d34a..7eb7e24c 100644
--- a/src/core/reactor_backend.cc
+++ b/src/core/reactor_backend.cc
@@ -19,6 +19,9 @@
* Copyright 2019 ScyllaDB
*/
#include "core/reactor_backend.hh"
+#include "core/io_desc.hh"
+#include "core/thread_pool.hh"
+#include "core/syscall_result.hh"
#include <seastar/core/print.hh>
#include <seastar/core/reactor.hh>
#include <seastar/util/defer.hh>
@@ -36,6 +39,183 @@ using namespace std::chrono_literals;
using namespace internal;
using namespace internal::linux_abi;

+aio_storage_context::aio_storage_context(reactor *r)
+ : _r(r)
+ , _io_context(0)
+{
+ for (unsigned i = 0; i != max_aio; ++i) {
+ _free_iocbs.push(&_iocb_pool[i]);
+ }
+
+ setup_aio_context(max_aio, &_io_context);
+}
+
+aio_storage_context::~aio_storage_context() {
+ io_destroy(_io_context);
+}
+
+internal::linux_abi::iocb*
+aio_storage_context::get_one() {
+ auto r = _free_iocbs.size();
+ assert(r > 0);
+ auto io = _free_iocbs.top();
+ _free_iocbs.pop();
+ return io;
+}
+
+void
+aio_storage_context::put_one(internal::linux_abi::iocb* io) {
+ _free_iocbs.push(io);
+}
+
+unsigned
+aio_storage_context::outstanding() const {
+ return max_aio - _free_iocbs.size();
+}
+
+extern bool aio_nowait_supported;
+
+void
+aio_storage_context::submit_io(io_desc* desc, io_request_builder req) {
+ // We can ignore the future returned here, because the submitted aio will be polled
+ // for and completed in process_io().
+ auto* iocb = get_one();
+ auto& io = *iocb;
+ req.prepare_iocb(io);
+ if (_r->_aio_eventfd) {
+ set_eventfd_notification(io, _r->_aio_eventfd->get_fd());
+ }
+ if (aio_nowait_supported) {
+ set_nowait(io, true);
+ }
+ set_user_data(io, desc);
+ _pending_aio.push_back(&io);
+}
+
+bool aio_storage_context::process_io()
+{
+ io_event ev[max_aio];
+ struct timespec timeout = {0, 0};
+ auto n = io_getevents(_io_context, 1, max_aio, ev, &timeout, _r->_force_io_getevents_syscall);
+ if (n == -1 && errno == EINTR) {
+ n = 0;
+ }
+
+ assert(n >= 0);
+ unsigned nr_retry = 0;
+ for (size_t i = 0; i < size_t(n); ++i) {
+ auto iocb = get_iocb(ev[i]);
+ if (ev[i].res == -EAGAIN) {
+ ++nr_retry;
+ set_nowait(*iocb, false);
+ _pending_aio_retry.push_back(iocb);
+ continue;
+ }
+ put_one(iocb);
+ auto desc = reinterpret_cast<io_desc*>(ev[i].data);
+ try {
+ _r->handle_io_result(ev[i]);
+ desc->set_value(size_t(ev[i].res));
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
+ delete desc;
+ }
+ return n;
+}
+
+// Returns: number of iocbs consumed (0 or 1)
+size_t
+aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {
+ switch (ec) {
+ case EAGAIN:
+ return 0;
+ case EBADF: {
+ auto desc = reinterpret_cast<io_desc*>(get_user_data(*iocb));
+ put_one(iocb);
+ try {
+ throw std::system_error(EBADF, std::system_category());
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
+ delete desc;
+ // if EBADF, it means that the first request has a bad fd, so
+ // we will only remove it from _pending_aio and try again.
+ return 1;
+ }
+ default:
+ ++_r->_io_stats.aio_errors;
+ throw_system_error_on(true, "io_submit");
+ abort();
+ }
+}
+
+bool aio_storage_context::flush_pending_aio() {
+ bool did_work = false;
+ while (!_pending_aio.empty()) {
+ auto nr = _pending_aio.size();
+ auto iocbs = _pending_aio.data();
+ auto r = io_submit(_io_context, nr, iocbs);
+ size_t nr_consumed;
+ if (r == -1) {
+ nr_consumed = handle_aio_error(iocbs[0], errno);
+ } else {
+ nr_consumed = size_t(r);
+ }
+
+ did_work = true;
+ if (nr_consumed == nr) {
+ _pending_aio.clear();
+ } else {
+ _pending_aio.erase(_pending_aio.begin(), _pending_aio.begin() + nr_consumed);
+ }
+ }
+ if (!_pending_aio_retry.empty()) {
+ auto retries = std::exchange(_pending_aio_retry, {});
+ // FIXME: future is discarded
+ (void)engine()._thread_pool->submit<syscall_result<int>>([this, retries] () mutable {
+ auto r = io_submit(_io_context, retries.size(), retries.data());
+ return wrap_syscall<int>(r);
+ }).then([this, retries] (syscall_result<int> result) {
+ auto iocbs = retries.data();
+ size_t nr_consumed = 0;
+ if (result.result == -1) {
+ nr_consumed = handle_aio_error(iocbs[0], result.error);
+ } else {
+ nr_consumed = result.result;
+ }
+ std::copy(retries.begin() + nr_consumed, retries.end(), std::back_inserter(_pending_aio_retry));
+ });
+ did_work = true;
+ }
+ did_work |= process_io();
+ return did_work;
+}
+
+bool aio_storage_context::aio_storage_context_pollfn::poll() {
+ return _ctx->flush_pending_aio();
+}
+
+bool aio_storage_context::aio_storage_context_pollfn::pure_poll() {
+ return poll(); // actually performs work, but triggers no user continuations, so okay
+}
+
+bool aio_storage_context::aio_storage_context_pollfn::try_enter_interrupt_mode() {
+ // Because aio depends on polling, it cannot generate events to wake us up, Therefore, sleep
+ // is only possible if there are no in-flight aios. If there are, we need to keep polling.
+ //
+ // Alternatively, if we enabled _aio_eventfd, we can always enter
+ unsigned executing = _ctx->outstanding();
+ return executing == 0 || _ctx->_r->_aio_eventfd;
+}
+void aio_storage_context::aio_storage_context_pollfn::exit_interrupt_mode() {
+ // nothing to do
+}
+
+std::unique_ptr<pollfn> aio_storage_context::create_poller() {
+ return std::make_unique<aio_storage_context_pollfn>(this);
+}
+
reactor_backend_aio::context::context(size_t nr) : iocbs(new iocb*[nr]) {
setup_aio_context(nr, &io_context);
}
@@ -184,7 +364,10 @@ void reactor_backend_aio::signal_received(int signo, siginfo_t* siginfo, void* i
engine()._signals.action(signo, siginfo, ignore);
}

-reactor_backend_aio::reactor_backend_aio(reactor* r) : _r(r) {
+reactor_backend_aio::reactor_backend_aio(reactor* r)
+ : _r(r)
+ , _aio_storage_context(_r)
+{
_task_quota_timer_iocb = make_poll_iocb(_r->_task_quota_timer.get(), POLLIN);
_timerfd_iocb = make_poll_iocb(_steady_clock_timer.get(), POLLIN);
_smp_wakeup_iocb = make_poll_iocb(_r->_notify_eventfd.get(), POLLIN);
@@ -292,8 +475,19 @@ void reactor_backend_aio::start_handling_signal() {
// implementation of request_preemption is not signal safe, so do nothing.
}

+std::unique_ptr<pollfn> reactor_backend_aio::create_storage_poller() {
+ return _aio_storage_context.create_poller();
+}
+
+void reactor_backend_aio::submit_io(io_desc* desc, io_request_builder irb) {
+ return _aio_storage_context.submit_io(desc, std::move(irb));
+}
+
reactor_backend_epoll::reactor_backend_epoll(reactor* r)
- : _r(r), _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC)) {
+ : _r(r)
+ , _epollfd(file_desc::epoll_create(EPOLL_CLOEXEC))
+ , _aio_storage_context(_r)
+{
::epoll_event event;
event.events = EPOLLIN;
event.data.ptr = nullptr;
@@ -455,6 +649,14 @@ void reactor_backend_epoll::reset_preemption_monitor() {
_r->_preemption_monitor.head.store(0, std::memory_order_relaxed);
}

+std::unique_ptr<pollfn> reactor_backend_epoll::create_storage_poller() {
+ return _aio_storage_context.create_poller();
+}
+
+void reactor_backend_epoll::submit_io(io_desc* desc, io_request_builder irb) {
+ return _aio_storage_context.submit_io(desc, std::move(irb));
+}
+
#ifdef HAVE_OSV
reactor_backend_osv::reactor_backend_osv() {
}
@@ -495,6 +697,15 @@ reactor_backend_osv::enable_timer(steady_clock_type::time_point when) {
_poller.set_timer(when);
}

+std::unique_ptr<pollfn> reactor_backend_osv::create_storage_poller() {
+ std::cerr << "reactor_backend_osv does not support file descriptors - create_storage_poller() shouldn't have been called!\n";
+ abort();
+}
+
+void reactor_backend_osv::submit_io(io_desc* desc, io_request_builder irb) {
+ std::cerr << "reactor_backend_osv does not support file descriptors - submit_io() shouldn't have been called!\n";
+ abort();
+}
#endif

static bool detect_aio_poll() {
--
2.20.1

Avi Kivity

<avi@scylladb.com>
unread,
Jan 22, 2020, 10:53:41 AM1/22/20
to Glauber Costa, seastar-dev@googlegroups.com

On 22/01/2020 17.35, Glauber Costa wrote:
> Our async I/O mechanism relies on Linux async I/O interface for now,
> which in turn relies on sending iocbs down to the Kernel. Because iocbs
> are so ingrained in the interface, they go all the way up and the
> write_dma and read_dma functions are already aware of it.
>
> However, there are now other interfaces, like io_uring, that rely on
> different data structures. To account for that, we will encapsulate the
> I/O information like file descriptor, addresses and sizes into a seastar
> structure and only transform this into iocbs down at the lowest level,
> where we already know which interface to use.
>
> In a follow up patch, we will have to do the same thing with the
> io_event structure.



> index 00000000..d82e3b7e
> --- /dev/null
> +++ b/include/seastar/core/io_request_builder.hh
> @@ -0,0 +1,58 @@
> +
> +namespace seastar {


Please put this in the seastar::internal namespace so users aren't
tempted to touch it (the file can be in include/seastar/core/internal).
Our code has the case statements aligned with the switch statement.


> + iocb = make_fdsync_iocb(_fd);
> + break;
> + case io_request_builder::operation::write:
> + iocb = make_write_iocb(_fd, _pos, _address, _size);
> + break;
> + case io_request_builder::operation::writev:
> + iocb = make_writev_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> + break;
> + case io_request_builder::operation::read:
> + iocb = make_read_iocb(_fd, _pos, _address, _size);
> + break;
> + case io_request_builder::operation::readv:
> + iocb = make_readv_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> + break;
> + default:
> + throw std::runtime_error("Can't prepare IOCB for unrecognized operation");
> + }
> +}


A fully virtualized solution will push the building to reactor_backend.
Of course that is difficult because there is no common type to return,
so reactor_backend will have to build and queue, not just build.


With that in mind, this is better named io_request (and can be a plain
struct). Of course, we can clean that up later.


Avi Kivity

<avi@scylladb.com>
unread,
Jan 22, 2020, 11:11:24 AM1/22/20
to Glauber Costa, seastar-dev@googlegroups.com

On 22/01/2020 17.35, Glauber Costa wrote:
> We currently protect the iocb acquisition path in the storage layer with
> a semaphore. This is a relic from a past in which we didn't have an I/O
> scheduler.


No, it's very new:


commit 8011bd33bf2c9a1f108eb722c8cc4f55e5592be9
Author: Avi Kivity <a...@scylladb.com>
Date:   Tue Dec 10 15:22:08 2019 +0200

    reactor: fix iocb pool underflow due to unaccounted aio fsync

    The iocb pool is sized for the worst case of all I/O
    queues having 128 outstanding requests each. But fsync also consumes
    iocbs from the pool, without any reservation having been made for
    them. Usually this doesn't matter because those fsyncs complete
    quickly, and because the pool is hugely oversized for the common
    case. But in the uncommon case (ENOSPC) things can clog and we can
    underflow the pool.

    Fix by introducing a semaphore for guarding the pool. It should
    be very rare that we have to wait on the semaphore, since usually
    the io_queue mechanism will limit requests to well below the
    pool capacity. It should only matter when the disk stalls and
    gets filled with fsync requests.

    Ref scylladb/scylla#5443.



> In reality, access to this structure has been protected by the I/O
> scheduler for a long time, and we are guaranteed to never run out of
> iocbs in the poll.
>
> Removing the semaphore allow us to un-futurize this, which is a benefit
> in itself. However as it will become clear in the next patch, the
> biggest advantage of this is that we will be able to unify this code
> with the reactor backend.
>
> In preparation for that we will also add a test for the fact that we
> really indeed have enough entries to draw from, despite the I/O scheduler
> guarantees.


I don't see where we guarantee we won't queue more than max_aio iocbs (I
agree we should push the limit to the scheduler). There is a check
looking at max_aio_per_queue, but that is related to mountpoints.


Relevant line are:


        if (configuration.count("max-io-requests")) {
            _capacity = configuration["max-io-requests"].as<unsigned>();
        }

Which we pass from the input without a cap, and if max-io-requests is
not provided, it's derived dynamically from the disk bandwidth and the
task quota period.


This never harmed us because the pool size is very large (1024 per
shard, which no disk needs), except for the fsync trouble above. Since
fsync is handled outside the scheduler, I'm not sure we can push the
limit to the scheduler.
Not that this code doesn't add tasks so it's cheap compared to
continuations. Still, removing it is a win.
You can undo the half-indent (that was added for the semaphore),
reducing noise.

Avi Kivity

<avi@scylladb.com>
unread,
Jan 22, 2020, 11:19:08 AM1/22/20
to Glauber Costa, seastar-dev@googlegroups.com

On 22/01/2020 17.35, Glauber Costa wrote:
> This series provides the groundwork for moving our I/O subsystem towards
> io_uring. I am not including any io_uring patches yet, and instead
> decided to collect early feedback on the general approach.
>
> The main seastar-specific problem we face is that we make assumptions
> about iocb, io_event and the presence of io_submit/io_getevents pretty
> much everywhere.
>
> The approach I have chosen to take is that all of that machinery is
> moved inside the already existing reactor_backend infrastructure.
> io_uring, when implemented, will be a new reactor backend capable of
> replacing all the polling and storage I/O at once.
>
> Comments welcome
>

Looks good, but I'm afraid patch 4 will require a lot more work. Better
to drop it now.


Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 11:50:14 AM1/22/20
to Avi Kivity, seastar-dev
okay. The main motivation for that patch in particular was to facilitate a potential unification
between iocb management in the storage and poll layers.

However I wasn't planning on doing it anyway, since once we move to io_uring I expect the benefits of that to be
much lower. Dropping that.
 

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 11:51:07 AM1/22/20
to Avi Kivity, seastar-dev
On Wed, Jan 22, 2020 at 11:11 AM Avi Kivity <a...@scylladb.com> wrote:

On 22/01/2020 17.35, Glauber Costa wrote:
> We currently protect the iocb acquisition path in the storage layer with
> a semaphore. This is a relic from a past in which we didn't have an I/O
> scheduler.


No, it's very new:

Ok, I think I know why I thought it was ancient:
I now recall that I had removed this semaphore before (before the fdatasync thing existed).

I am dropping this patch for now, thanks.

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 11:55:28 AM1/22/20
to Avi Kivity, seastar-dev
On Wed, Jan 22, 2020 at 10:53 AM Avi Kivity <a...@scylladb.com> wrote:

On 22/01/2020 17.35, Glauber Costa wrote:
> Our async I/O mechanism relies on Linux async I/O interface for now,
> which in turn relies on sending iocbs down to the Kernel. Because iocbs
> are so ingrained in the interface, they go all the way up and the
> write_dma and read_dma functions are already aware of it.
>
> However, there are now other interfaces, like io_uring, that rely on
> different data structures. To account for that, we will encapsulate the
> I/O information like file descriptor, addresses and sizes into a seastar
> structure and only transform this into iocbs down at the lowest level,
> where we already know which interface to use.
>
> In a follow up patch, we will have to do the same thing with the
> io_event structure.



> index 00000000..d82e3b7e
> --- /dev/null
> +++ b/include/seastar/core/io_request_builder.hh
> @@ -0,0 +1,58 @@
> +
> +namespace seastar {


Please put this in the seastar::internal namespace so users aren't
tempted to touch it (the file can be in include/seastar/core/internal).


thanks. I will do that.
Okay, will adjust.
 


> +            iocb = make_fdsync_iocb(_fd);
> +            break;
> +        case io_request_builder::operation::write:
> +            iocb = make_write_iocb(_fd, _pos, _address, _size);
> +            break;
> +        case io_request_builder::operation::writev:
> +            iocb = make_writev_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> +            break;
> +        case io_request_builder::operation::read:
> +            iocb = make_read_iocb(_fd, _pos, _address, _size);
> +            break;
> +        case io_request_builder::operation::readv:
> +            iocb = make_readv_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> +            break;
> +        default:
> +            throw std::runtime_error("Can't prepare IOCB for unrecognized operation");
> +    }
> +}


A fully virtualized solution will push the building to reactor_backend.
Of course that is difficult because there is no common type to return,
so reactor_backend will have to build and queue, not just build.


With that in mind, this is better named io_request (and can be a plain
struct). Of course, we can clean that up later.

There is no reason not to do it now, since the main goal of me sending this series.

It is possible to move it fully to the backend, but not easy to do in this patch.
The reason is that this becomes a bit circular: moving to the backend is made easier by the fact that the iocb reference is gone.
But once the code is moved to the reactor backend this function is only really used by the aio_context's version of submit_io.

I will make sure this is moved in that patch (right now it isn't)

Avi Kivity

<avi@scylladb.com>
unread,
Jan 22, 2020, 11:55:47 AM1/22/20
to Glauber Costa, seastar-dev

I think it's a worthwhile goal even with io_uring. Note that that the fd io_uring/io_context will also need limit checking, right now we just hope we allocated enough.


I think io_uring decouples the communication ring capacity from the concurrency limit while aio does not, so it should be easier in io_uring. But of course we can do it later.

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 22, 2020, 12:03:05 PM1/22/20
to Avi Kivity, seastar-dev
My plan was to leave that as an exercise for the reader.
If book authors can get away with this clear sham, so can I.

 

Avi Kivity

<avi@scylladb.com>
unread,
Jan 22, 2020, 12:27:37 PM1/22/20
to Glauber Costa, seastar-dev

Book authors have readers.


Rafael Avila de Espindola

<espindola@scylladb.com>
unread,
Jan 22, 2020, 4:15:31 PM1/22/20
to Glauber Costa, seastar-dev@googlegroups.com, Glauber Costa
Glauber Costa <gla...@scylladb.com> writes:

> future<internal::linux_abi::io_event>
> - queue_request(const io_priority_class& pc, size_t len, request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> do_io);
> + queue_request(const io_priority_class& pc, size_t len, io_request_builder req);

io_request_builder has a _size, is that the same as len in here? Could
len be removed?

BTW, not sure if "_builder" addas a lot of value. Would just
seastar::io_request be OK?


> +#include <seastar/core/sstring.hh>
> +#include <seastar/core/linux-aio.hh>
> +
> +namespace seastar {
> +class io_request_builder {
> +public:
> + enum class operation { read, readv, write, writev, fdatasync };
> + operation _op;
> + int _fd;
> + uint64_t _pos;
> + void *_address;
> + size_t _size;
> +public:
> + io_request_builder(operation op, int fd, uint64_t pos, void* address, size_t size)
> + : _op(op)
> + , _fd(fd)
> + , _pos(pos)
> + , _address(address)
> + , _size(size)
> + {}
> + io_request_builder(operation op, int fd) : io_request_builder(op, fd, 0, nullptr, 0) {}

Nit: Could the io_request_builder constructors be private and only
expose static methods make_read(), make_readv, make_fdatasync, etc? The
advantage being that each of the make_* takes only the relevant fields.

> future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq,
> const io_priority_class& priority_class,
> size_t len,
> - noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
> + io_request_builder builder);
> future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq,
> const io_priority_class& priority_class,
> size_t len,
> - noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
> + io_request_builder builder);

Two other cases of having len and builder._size.

> future<io_event>
> -io_queue::queue_request(const io_priority_class& pc, size_t len, io_queue::request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) {
> +io_queue::queue_request(const io_priority_class& pc, size_t len, io_request_builder req) {
> auto start = std::chrono::steady_clock::now();
> - return smp::submit_to(coordinator(), [start, &pc, len, req_type, prepare_io = std::move(prepare_io), owner = engine().cpu_id(), this] () mutable {
> + return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable {
> // First time will hit here, and then we create the class. It is important
> // that we create the shared pointer in the same shard it will be used at later.
> auto& pclass = find_or_create_class(pc, owner);
> pclass.nr_queued++;
> unsigned weight;
> size_t size;
> - if (req_type == io_queue::request_type::write) {
> + if (req.is_write()) {
> weight = _config.disk_req_write_to_read_multiplier;
> size = _config.disk_bytes_write_to_read_multiplier * len;
> - } else {
> + } else if (req.is_read()) {
> weight = io_queue::read_request_base_count;
> size = io_queue::read_request_base_count * len;
> + } else {
> + throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {} ", req.operation()));

How about on_internal_error instead?

> +void io_request_builder::prepare_iocb(iocb& iocb) {

How about returning an iocb instead?

> + switch (_op) {
> + case io_request_builder::operation::fdatasync:
> + iocb = make_fdsync_iocb(_fd);
> + break;
> + case io_request_builder::operation::write:
> + iocb = make_write_iocb(_fd, _pos, _address, _size);
> + break;
> + case io_request_builder::operation::writev:
> + iocb = make_writev_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> + break;
> + case io_request_builder::operation::read:
> + iocb = make_read_iocb(_fd, _pos, _address, _size);
> + break;
> + case io_request_builder::operation::readv:
> + iocb = make_readv_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> + break;
> + default:
> + throw std::runtime_error("Can't prepare IOCB for unrecognized operation");
> + }
> +}

This is a fully covered switch. The trick to avoid the extra default and
be safe is to write it as

switch (_op) {
case...:
case...:
}
builtin_unreachable();

If any case is missing, gcc produces a warning which fails the
build. The builtin_unreachable() after the switch is not needed is this
case, but in general it is there to avoid gcc warning about not
returning in all paths. Yes, gcc is good at doublethink.


> +
> +sstring io_request_builder::operation() const {
> + switch (_op) {
> + case io_request_builder::operation::fdatasync:
> + return "fdatasync";
> + case io_request_builder::operation::write:
> + return "write";
> + case io_request_builder::operation::writev:
> + return "vectored write";
> + case io_request_builder::operation::read:
> + return "read";
> + case io_request_builder::operation::readv:
> + return "vectored read";
> + default:
> + return fmt::format("unknown operation {}", unsigned(_op));
> + }
> +}

Another fully covered switch.

Cheers,
Rafael

Rafael Avila de Espindola

<espindola@scylladb.com>
unread,
Jan 22, 2020, 4:16:57 PM1/22/20
to Glauber Costa, seastar-dev@googlegroups.com, Glauber Costa
Glauber Costa <gla...@scylladb.com> writes:

> This series provides the groundwork for moving our I/O subsystem towards
> io_uring.

Thanks! I am super excited to see how that performs!

Cheers,
Rafael

Botond Dénes

<bdenes@scylladb.com>
unread,
Jan 23, 2020, 1:39:48 AM1/23/20
to Rafael Avila de Espindola, Glauber Costa, seastar-dev@googlegroups.com
If `_op` can ever have a corrupt value, `std::abort()` is better here.

Rafael Avila de Espindola

<espindola@scylladb.com>
unread,
Jan 23, 2020, 1:48:10 AM1/23/20
to Botond Dénes, Glauber Costa, seastar-dev@googlegroups.com

>> This is a fully covered switch. The trick to avoid the extra default
>> and
>> be safe is to write it as
>>
>> switch (_op) {
>> case...:
>> case...:
>> }
>> builtin_unreachable();
>
>
>
> If `_op` can ever have a corrupt value, `std::abort()` is better here.

Is that really worth protecting against? I was never that lucky with
corrupted memory.

Cheers,
Rafael

Botond Dénes

<bdenes@scylladb.com>
unread,
Jan 23, 2020, 1:56:25 AM1/23/20
to Rafael Avila de Espindola, Glauber Costa, seastar-dev@googlegroups.com
I don't know, maybe not worth it, depends on how paranoid you are. We
have examples of both in our code base.

>
> Cheers,
> Rafael


Glauber Costa

<glauber@scylladb.com>
unread,
Jan 23, 2020, 8:33:55 AM1/23/20
to Rafael Avila de Espindola, seastar-dev
On Wed, Jan 22, 2020 at 4:15 PM Rafael Avila de Espindola <espi...@scylladb.com> wrote:
Glauber Costa <gla...@scylladb.com> writes:

>      future<internal::linux_abi::io_event>
> -    queue_request(const io_priority_class& pc, size_t len, request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> do_io);
> +    queue_request(const io_priority_class& pc, size_t len, io_request_builder req);

io_request_builder has a _size, is that the same as len in here? Could
len be removed?

No.
In the case of an iovec, len is the amount of entries in the vector.

The len argument in queue_request is the total size of the operation (in the case of an iovec, the sum of all iov_len) for the purposes of accounting for the I/O Scheduler.
 

BTW, not sure if "_builder" addas a lot of value. Would just
seastar::io_request be OK?

Already changed it as requested by avi (to seastar::internal::io_request)
 


> +#include <seastar/core/sstring.hh>
> +#include <seastar/core/linux-aio.hh>
> +
> +namespace seastar {
> +class io_request_builder {
> +public:
> +    enum class operation { read, readv, write, writev, fdatasync };
> +    operation _op;
> +    int _fd;
> +    uint64_t _pos;
> +    void *_address;
> +    size_t _size;
> +public:
> +    io_request_builder(operation op, int fd, uint64_t pos, void* address, size_t size)
> +        : _op(op)
> +        , _fd(fd)
> +        , _pos(pos)
> +        , _address(address)
> +        , _size(size)
> +    {}
> +    io_request_builder(operation op, int fd) : io_request_builder(op, fd, 0, nullptr, 0) {}

Nit: Could the io_request_builder constructors be private and only
expose static methods make_read(), make_readv, make_fdatasync, etc? The
advantage being that each of the make_* takes only the relevant fields.

that sounds like a good direction
 
>      future<internal::linux_abi::io_event> submit_io_read(io_queue* ioq,
>              const io_priority_class& priority_class,
>              size_t len,
> -            noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
> +            io_request_builder builder);
>      future<internal::linux_abi::io_event> submit_io_write(io_queue* ioq,
>              const io_priority_class& priority_class,
>              size_t len,
> -            noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io);
> +            io_request_builder builder);

Two other cases of having len and builder._size.

they are the same for non-vectored I/O, but different for vectored I/O.
 

>  future<io_event>
> -io_queue::queue_request(const io_priority_class& pc, size_t len, io_queue::request_type req_type, noncopyable_function<void (internal::linux_abi::iocb&)> prepare_io) {
> +io_queue::queue_request(const io_priority_class& pc, size_t len, io_request_builder req) {
>      auto start = std::chrono::steady_clock::now();
> -    return smp::submit_to(coordinator(), [start, &pc, len, req_type, prepare_io = std::move(prepare_io), owner = engine().cpu_id(), this] () mutable {
> +    return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable {
>          // First time will hit here, and then we create the class. It is important
>          // that we create the shared pointer in the same shard it will be used at later.
>          auto& pclass = find_or_create_class(pc, owner);
>          pclass.nr_queued++;
>          unsigned weight;
>          size_t size;
> -        if (req_type == io_queue::request_type::write) {
> +        if (req.is_write()) {
>              weight = _config.disk_req_write_to_read_multiplier;
>              size = _config.disk_bytes_write_to_read_multiplier * len;
> -        } else {
> +        } else if (req.is_read()) {
>              weight = io_queue::read_request_base_count;
>              size = io_queue::read_request_base_count * len;
> +        } else {
> +            throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {} ", req.operation()));

How about on_internal_error instead?


sorry, not sure what exactly do you mean

 
> +void io_request_builder::prepare_iocb(iocb& iocb) {

How about returning an iocb instead?

Why ?
I can't avoid passing the iocb, because the iocb is received from a pool that is external to this structure.
 

> +    switch (_op) {
> +        case io_request_builder::operation::fdatasync:
> +            iocb = make_fdsync_iocb(_fd);
> +            break;
> +        case io_request_builder::operation::write:
> +            iocb = make_write_iocb(_fd, _pos, _address, _size);
> +            break;
> +        case io_request_builder::operation::writev:
> +            iocb = make_writev_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> +            break;
> +        case io_request_builder::operation::read:
> +            iocb = make_read_iocb(_fd, _pos, _address, _size);
> +            break;
> +        case io_request_builder::operation::readv:
> +            iocb = make_readv_iocb(_fd, _pos, reinterpret_cast<const iovec*>(_address), _size);
> +            break;
> +        default:
> +            throw std::runtime_error("Can't prepare IOCB for unrecognized operation");
> +    }
> +}

This is a fully covered switch. The trick to avoid the extra default and
be safe is to write it as

I don't want to avoid the extra default. In my view, it is there to protect us against future code, where someone added an opcode and forgot to
add the relevant switch.

If there was a way to statically warn about it and make sure it was a fully covered switch at all times, then I'd be happy to remove the default.
 

switch (_op) {
  case...:
  case...:
}
builtin_unreachable();

If any case is missing, gcc produces a warning which fails the
build.

Interesting, I did not know that. In that case I most definitely *don't* want the builtin unreachable and can remove the default.

Rafael Avila de Espindola

<espindola@scylladb.com>
unread,
Jan 23, 2020, 11:04:55 AM1/23/20
to Glauber Costa, seastar-dev
Glauber Costa <gla...@scylladb.com> writes:

>> > + } else if (req.is_read()) {
>> > weight = io_queue::read_request_base_count;
>> > size = io_queue::read_request_base_count * len;
>> > + } else {
>> > + throw std::runtime_error(fmt::format("Unrecognized request
>> passing through I/O queue {} ", req.operation()));
>>
>> How about on_internal_error instead?
>>
>>
> sorry, not sure what exactly do you mean

Instead of

} else {
throw ...
}

Use

} else {
on_internal_error(...);
}

This makes it abort if abort_on_internal_error is set, which is true for
tests.

>> > +void io_request_builder::prepare_iocb(iocb& iocb) {
>>
>> How about returning an iocb instead?
>>
>
> Why ?

Simpler code:

switch (_op) {
case io_request_builder::operation::fdatasync:
return make_fdsync_iocb(_fd);
case ...:
...


> I can't avoid passing the iocb, because the iocb is received from a pool
> that is external to this structure.

The change would be

- prepare_io(io);
+ io = req.prepare_iocb();

No?

>> > + default:
>> > + throw std::runtime_error("Can't prepare IOCB for
>> unrecognized operation");
>> > + }
>> > +}
>>
>> This is a fully covered switch. The trick to avoid the extra default and
>> be safe is to write it as
>>
>
> I don't want to avoid the extra default. In my view, it is there to protect
> us against future code, where someone added an opcode and forgot to
> add the relevant switch.

They can't forget. GCC will fail the build because of -Werror.

> If there was a way to statically warn about it and make sure it was a fully
> covered switch at all times, then I'd be happy to remove the default.

That is exactly what it does. A reduced example:

enum class foo { bar, zed };
int func(foo x) {
switch (x) {
case foo::bar:
return 1;
case foo::zed:
return 2;
}
__builtin_unreachable();
}

This compiles fine with -Wall. If you add 'baz' no the enum but not the
switch you get:

test.cc:4:12: warning: enumeration value ‘baz’ not handled in switch [-Wswitch]

> Interesting, I did not know that. In that case I most definitely *don't*
> want the builtin unreachable and can remove the default.

In this case you don't need it, but in general you do because it is
GCC. If you remove it from the reduced example above you get

test.cc:10:1: warning: control reaches end of non-void function [-Wreturn-type]

Which is wrong, since the switch covers all paths.

Cheers,
Rafael

Glauber Costa

<glauber@scylladb.com>
unread,
Jan 23, 2020, 11:08:25 AM1/23/20
to Rafael Avila de Espindola, seastar-dev
No.

It would be some variation of io = req.prepare_iocb(io);

iocbs come from a poll that is private from another data structure so they have to be passed on to this function
Thanks.
 

Cheers,
Rafael

Avi Kivity

<avi@scylladb.com>
unread,
Jan 23, 2020, 2:00:23 PM1/23/20
to Glauber Costa, Rafael Avila de Espindola, seastar-dev

In any case, the patch moves an interface and should defer a refactoring until later. I do think a refactoring is merited (by moving all iocb related code to a helper function outside io_request), but we can do that after we see the requirements from io_uring.


Rafael Avila de Espindola

<espindola@scylladb.com>
unread,
Jan 23, 2020, 3:24:36 PM1/23/20
to Avi Kivity, Glauber Costa, seastar-dev
Avi Kivity <a...@scylladb.com> writes:

>> It would be some variation of io = req.prepare_iocb(io);
>>
>> iocbs come from a poll that is private from another data structure so
>> they have to be passed on to this function
>>
>>
>
> In any case, the patch moves an interface and should defer a refactoring
> until later. I do think a refactoring is merited (by moving all iocb
> related code to a helper function outside io_request), but we can do
> that after we see the requirements from io_uring.

I agree and should have been more explicit. I am OK with the patches
landing and was pointing out only possible future improvements.

Cheers,
Rafael
Reply all
Reply to author
Forward
0 new messages