To perform cancellable IO one should
1. Create an io_intent object (non-copyable one)
2. Queue a request with this object passed as argument
While the intent is alive all IO goes as it used to. Once the
intent is destroyed, all the requests that were created with
one will be eventually dropped from processing (unless they've
already been submitted to kernel) without doing real IO.
To make this all work requests "pin" the intent when being
queued and in critical IO-submission places they check for the
intent to be still valid. If it's not, the further processing
stops.
Since the intent iself cannot be referenced by requests, a per
shard store of request descriptors is maintained and both --
requests and the intents themselves -- hold handlers to the
descriptors.
An intent activates an "owned" handler in the store when being
pinned for the first time. All requests that pin the intent do
it with the help of reference counter on intent's descriptor.
When the intent dies it deactivates the descriptor by clearing
the "owned" mark. Once the descriptor loses all users (i.e.
the reference counter and owner become 0) it's released and
becomes available for activation by another intent.
Not to bloat the memory with descriptors and handles both are
16-bit values. The highest bit of the descriptor is the "owned"
mark, the remainder is the reference counter. This gives 64k
descriptors pinned by 32k requests each. The "unused" descriptor
is described by value of 0 which is also qute handy.
The existing file's API push the perpetual intent into IO queue,
it's never destroyed, so the "legacy" IO works as before.
include/seastar/core/internal/io_request.hh | 17 ++++
include/seastar/core/internal/io_sink.hh | 14 +++-
include/seastar/core/io_intent.hh | 68 ++++++++++++++++
include/seastar/core/io_queue.hh | 5 +-
include/seastar/core/reactor.hh | 5 ++
src/core/io_queue.cc | 87 ++++++++++++++++++++-
src/core/reactor.cc | 14 +++-
tests/unit/io_queue_test.cc | 4 +-
8 files changed, 204 insertions(+), 10 deletions(-)
create mode 100644 include/seastar/core/io_intent.hh
diff --git a/include/seastar/core/internal/io_request.hh b/include/seastar/core/internal/io_request.hh
index 36132030..e29f4fa2 100644
--- a/include/seastar/core/internal/io_request.hh
+++ b/include/seastar/core/internal/io_request.hh
@@ -24,6 +24,7 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/linux-aio.hh>
#include <seastar/core/internal/io_desc.hh>
+#include <seastar/core/io_intent.hh>
#include <sys/types.h>
#include <sys/socket.h>
@@ -35,6 +36,7 @@ class io_request {
enum class operation : uint16_t { read, readv, write, writev, fdatasync, recv, recvmsg, send, sendmsg, accept, connect, poll_add, poll_remove, cancel };
private:
operation _op;
+ io_intent::handle _intent;
int _fd;
union {
uint64_t pos;
@@ -63,6 +65,7 @@ class io_request {
explicit io_request(operation op, int fd, int flags, ::msghdr* msg)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.flags = flags;
@@ -71,6 +74,7 @@ class io_request {
explicit io_request(operation op, int fd, sockaddr* sa, socklen_t sl)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_ptr.sockaddr = sa;
@@ -79,6 +83,7 @@ class io_request {
explicit io_request(operation op, int fd, int flags, sockaddr* sa, socklen_t* sl)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.flags = flags;
@@ -87,6 +92,7 @@ class io_request {
}
explicit io_request(operation op, int fd, uint64_t pos, char* ptr, size_t size)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.pos = pos;
@@ -96,6 +102,7 @@ class io_request {
explicit io_request(operation op, int fd, uint64_t pos, iovec* ptr, size_t size)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.pos = pos;
@@ -105,10 +112,12 @@ class io_request {
explicit io_request(operation op, int fd)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{}
explicit io_request(operation op, int fd, int events)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.events = events;
@@ -116,6 +125,7 @@ class io_request {
explicit io_request(operation op, int fd, char *ptr)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_ptr.addr = ptr;
@@ -124,6 +134,7 @@ class io_request {
public:
io_request(io_request&& other) noexcept
: _op(other._op)
+ , _intent(std::exchange(other._intent, io_intent::no_handle))
, _fd(other._fd)
, _attr(std::move(other._attr))
, _ptr(std::move(other._ptr))
@@ -134,6 +145,7 @@ class io_request {
io_request& operator=(io_request&& other) noexcept {
if (&other != this) {
_op = other._op;
+ _intent = std::exchange(other._intent, io_intent::no_handle);
_fd = other._fd;
_attr = std::move(other._attr);
_ptr = std::move(other._ptr);
@@ -146,6 +158,11 @@ class io_request {
io_request(const io_request&) = delete;
io_request& operator=(const io_request&) = delete;
+ ~io_request();
+
+ io_intent::handle intent() const noexcept { return _intent; }
+ void set_intent(io_intent::handle h) noexcept { _intent = h; }
+
bool is_read() const {
switch (_op) {
case operation::read:
diff --git a/include/seastar/core/internal/io_sink.hh b/include/seastar/core/internal/io_sink.hh
index 92184de0..41ca37e4 100644
--- a/include/seastar/core/internal/io_sink.hh
+++ b/include/seastar/core/internal/io_sink.hh
@@ -26,28 +26,34 @@
namespace seastar {
class io_completion;
+namespace internal { class io_request; }
class io_sink {
circular_buffer<internal::io_request> _pending_io;
+ bool check_request_intent(internal::io_request& req) noexcept;
public:
void submit(io_completion* desc, internal::io_request req) noexcept;
template <typename Fn>
size_t drain(Fn&& fn) {
size_t pending = _pending_io.size();
- size_t drained = 0;
+ size_t drained = 0, skipped = 0;
while (pending > drained) {
internal::io_request& req = _pending_io[drained];
- if (!fn(req)) {
- break;
+ if (!check_request_intent(req)) {
+ skipped++;
+ } else {
+ if (!fn(req)) {
+ break;
+ }
}
drained++;
}
_pending_io.erase(_pending_io.begin(), _pending_io.begin() + drained);
- return drained;
+ return drained - skipped;
}
};
diff --git a/include/seastar/core/io_intent.hh b/include/seastar/core/io_intent.hh
new file mode 100644
index 00000000..154d7fa8
--- /dev/null
+++ b/include/seastar/core/io_intent.hh
@@ -0,0 +1,68 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2020 ScyllaDB
+ */
+
+#pragma once
+#include <seastar/util/vector_with_freelist.hh>
+
+namespace seastar {
+
+class io_intent_store;
+
+class io_intent {
+ friend class io_intent_store;
+
+public:
+ using handle = uint16_t;
+ static constexpr handle no_handle = 0xFFFF;
+ static constexpr handle max_handle = 0x7FFF;
+
+ io_intent() noexcept = default;
+ io_intent(io_intent&& other) noexcept
+ : _h(std::exchange(other._h, no_handle))
+ , _coordinator(std::exchange(other._coordinator, -1))
+ {}
+ io_intent(const io_intent&) = delete;
+ ~io_intent();
+
+ static bool check_handle(handle h) noexcept;
+
+private:
+ handle _h = no_handle;
+ int _coordinator = -1;
+};
+
+class io_intent_store {
+ using descriptor = uint16_t;
+ static constexpr descriptor owned_desc = 0x8000;
+ static constexpr descriptor unused_desc = 0x0000;
+
+ vector_with_freelist<descriptor, io_intent::handle> _descs;
+ void maybe_release(io_intent::handle h) noexcept;
+
+public:
+ io_intent_store() noexcept = default;
+ io_intent::handle pin(io_intent& intent);
+ void unpin(io_intent::handle h) noexcept;
+ void deactivate(io_intent::handle h) noexcept;
+ bool check(io_intent::handle h) const noexcept;
+};
+
+} // namespace seastar
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 1c7f5799..f12d455e 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -32,6 +32,7 @@
namespace seastar {
class io_priority_class;
+class io_intent;
/// Renames an io priority class
///
@@ -125,7 +126,7 @@ class io_queue {
~io_queue();
future<size_t>
- queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept;
+ queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent& intent) noexcept;
[[deprecated("modern I/O queues should use a property file")]] size_t capacity() const {
return _config.capacity;
@@ -167,6 +168,8 @@ class io_queue {
private:
config _config;
static fair_queue::config make_fair_queue_config(config cfg);
+
+ void note_request_intent(internal::io_request& req, io_intent& intent);
};
}
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index adf3c2c7..871be674 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -83,6 +83,7 @@
#include <seastar/core/internal/io_request.hh>
#include <seastar/core/internal/io_sink.hh>
#include <seastar/core/make_task.hh>
+#include <seastar/core/io_intent.hh>
#include "internal/pollable_fd.hh"
#include "internal/poll.hh"
@@ -263,6 +264,7 @@ class reactor {
std::vector<std::unique_ptr<io_queue>> my_io_queues;
std::unordered_map<dev_t, io_queue*> _io_queues;
io_sink _io_sink;
+ io_intent_store _io_intents;
std::vector<noncopyable_function<future<> ()>> _exit_funcs;
unsigned _id = 0;
@@ -469,6 +471,9 @@ class reactor {
}
}
+ io_intent_store& io_intents() noexcept { return _io_intents; }
+ const io_intent_store& io_intents() const noexcept { return _io_intents; }
+
io_priority_class register_one_priority_class(sstring name, uint32_t shares);
/// \brief Updates the current amount of shares for a given priority class
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 5c4fc632..f49a103e 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -26,6 +26,7 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/linux-aio.hh>
+#include <seastar/core/io_intent.hh>
#include <seastar/core/internal/io_desc.hh>
#include <seastar/core/internal/io_sink.hh>
#include <seastar/util/log.hh>
@@ -284,21 +285,32 @@ fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, s
return fair_queue_ticket(weight, size);
}
+
+void io_queue::note_request_intent(internal::io_request& req, io_intent& intent) {
+ req.set_intent(engine().io_intents().pin(intent));
+}
+
future<size_t>
-io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
+io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent& intent) noexcept {
auto start = std::chrono::steady_clock::now();
- return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
+ return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this, &intent] () mutable {
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
+ note_request_intent(req, intent);
io_log.trace("dev {} : req {} queue len {} ticket {}", _config.devid, fmt::ptr(&*desc), len, fq_ticket);
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
_queued_requests--;
_requests_executing++;
pclass.nr_queued--;
+ if (!io_intent::check_handle(req.intent())) {
+ d->set_exception(std::make_exception_ptr(default_io_exception_factory::canceled()));
+ return;
+ }
+
pclass.ops++;
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
@@ -329,6 +341,17 @@ io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
}
}
+bool io_sink::check_request_intent(internal::io_request& req) noexcept {
+ io_intent::handle h = req.intent();
+ if (h == io_intent::no_handle || io_intent::check_handle(h)) {
+ return true;
+ }
+
+ io_completion* d = dynamic_cast<io_completion*>(req.get_kernel_completion());
+ d->set_exception(std::make_exception_ptr(default_io_exception_factory::canceled()));
+ return false;
+}
+
void io_sink::submit(io_completion* desc, internal::io_request req) noexcept {
req.attach_kernel_completion(desc);
try {
@@ -338,4 +361,64 @@ void io_sink::submit(io_completion* desc, internal::io_request req) noexcept {
}
}
+io_intent::handle io_intent_store::pin(io_intent& intent) {
+ io_intent::handle h = intent._h;
+
+ if (h == io_intent::no_handle) {
+ assert(intent._coordinator == -1);
+ h = _descs.alloc();
+ assert(h < io_intent::max_handle);
+ _descs[h] = owned_desc;
+ intent._h = h;
+ intent._coordinator = this_shard_id();
+ }
+
+ _descs[h]++;
+ return h;
+}
+
+void io_intent_store::maybe_release(io_intent::handle h) noexcept {
+ if (_descs[h] == unused_desc) {
+ _descs.release(h);
+ }
+}
+
+void io_intent_store::unpin(io_intent::handle h) noexcept {
+ _descs[h]--;
+ maybe_release(h);
+}
+
+void io_intent_store::deactivate(io_intent::handle h) noexcept {
+ assert(_descs[h] & owned_desc);
+ _descs[h] &= ~owned_desc;
+ maybe_release(h);
+}
+
+bool io_intent_store::check(io_intent::handle h) const noexcept {
+ assert(h != io_intent::no_handle);
+ return _descs[h] & owned_desc;
+}
+
+
+bool io_intent::check_handle(handle h) noexcept {
+ return engine().io_intents().check(h);
+}
+
+io_intent::~io_intent() {
+ if (_h != io_intent::no_handle) {
+ assert(_coordinator != -1);
+ (void)smp::submit_to(_coordinator, [h = _h] () noexcept {
+ engine().io_intents().deactivate(h);
+ });
+ }
+}
+
+namespace internal {
+io_request::~io_request() {
+ if (_intent != io_intent::no_handle) {
+ engine().io_intents().unpin(_intent);
+ }
+}
+}
+
}
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 8140305b..927f432a 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -50,6 +50,7 @@
#include <seastar/core/thread_cputime_clock.hh>
#include <seastar/core/abort_on_ebadf.hh>
#include <seastar/core/io_queue.hh>
+#include <seastar/core/io_intent.hh>
#include <seastar/core/internal/io_desc.hh>
#include <seastar/core/internal/buffer_allocator.hh>
#include <seastar/core/scheduling_specific.hh>
@@ -920,6 +921,8 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
});
}
+static thread_local io_intent perpetual_intent = io_intent();
+
reactor::~reactor() {
sigset_t mask;
sigemptyset(&mask);
@@ -956,6 +959,13 @@ reactor::~reactor() {
}
}
}
+
+ // This is to make x destroy and remove its intent
+ // from the intent store. The same cannot be done
+ // via perpetual_intent destructor itself, as it
+ // will be destroyed _after_ reactor and the intent
+ // store will be already free by that time
+ io_intent x(std::move(perpetual_intent));
}
future<> reactor::readable(pollable_fd_state& fd) {
@@ -1541,14 +1551,14 @@ future<size_t>
reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
++_io_stats.aio_reads;
_io_stats.aio_read_bytes += len;
- return ioq->queue_request(pc, len, std::move(req));
+ return ioq->queue_request(pc, len, std::move(req), perpetual_intent);
}
future<size_t>
reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
++_io_stats.aio_writes;
_io_stats.aio_write_bytes += len;
- return ioq->queue_request(pc, len, std::move(req));
+ return ioq->queue_request(pc, len, std::move(req), perpetual_intent);
}
namespace internal {
diff --git a/tests/unit/io_queue_test.cc b/tests/unit/io_queue_test.cc
index 3f3b48c4..cc6a977b 100644
--- a/tests/unit/io_queue_test.cc
+++ b/tests/unit/io_queue_test.cc
@@ -27,6 +27,7 @@
#include <seastar/core/smp.hh>
#include <seastar/core/file.hh>
#include <seastar/core/io_queue.hh>
+#include <seastar/core/io_intent.hh>
#include <seastar/core/internal/io_request.hh>
#include <seastar/core/internal/io_sink.hh>
@@ -51,8 +52,9 @@ SEASTAR_THREAD_TEST_CASE(test_basic_flow) {
io_sink sink;
io_queue q(sink, io_queue::config{0, this_shard_id()});
fake_file<1> file;
+ io_intent intent;
- auto f = q.queue_request(default_priority_class(), 0, file.make_write_req(0, 42))
+ auto f = q.queue_request(default_priority_class(), 0, file.make_write_req(0, 42), intent)
.then([&file] (size_t len) {
BOOST_REQUIRE(file.data[0] == 42);
});
--
2.20.1