[PATCH 00/11] Cancellable IO

16 views
Skip to first unread message

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:09 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
To make the cancellable request the API is simple:

- provide an 'io_intent' object when submitting a request

To cancel the IO later on the intent object should just
be destructed.

Due to kernel limitations only the requests sitting in
the fair queue and in the reactor's pending queue are
canceled. When the uring backend will appear, it should
become possible to withdraw requests from the kernel.

Due to (or thanks to) circular buffer limitations requests
are not canceled right at once, but only when the respective
poller gets to them and finds out they are not longer of
any interest. At this point the request's completion future
is resolved into the "canceled" exception and the request
is not processed further.

The set effectively consists of 3 parts:

1. vector_with_freelist collection (patch #1)
2. io_sink concept (patches 2 through 6)
3. the cancellation itself (the rest)

The io_sink concept generalizes the reactor::_pending_io
queue to make it usable outside of AIO backend. This allows
having a unit test for IO cancellation.

Since currently requests submitted on one shard may pop
up on another, the destruction of an intent issues the
smp::submit_to call towards the io coordinator shard to
cancel the requests there. It's as safe as the foreign_ptr's
behaviour, but unlike it is temporary -- when the io groups
are merged the respective cross-shards cancellation will
go away.

Next:
- push the intents through file/source/sink/stream stack

branch: https://github.com/xemul/seastar/tree/br-io-cancellation
tests: unit(dev), io_tester, iotune
refs: #811

Pavel Emelyanov (11):
utils: Vector with freelist
reactor-backend: Remove requests from pending queue earlier
reactor-backend: Move pending queue draining logic into helper
io: Introduce the io_sink
io_queue: Keep reference on io_sink
tests: Add a (rather stupid) io_queue unit test
io: Make io_request non-copyable
io: Make io_request::operation be 16-bits
io: Cancellation exception
io: Requests cancellation
tests: IO cancellation test

include/seastar/core/internal/io_request.hh | 44 ++++++-
include/seastar/core/internal/io_sink.hh | 60 +++++++++
include/seastar/core/io_intent.hh | 68 ++++++++++
include/seastar/core/io_queue.hh | 9 +-
include/seastar/core/reactor.hh | 9 +-
include/seastar/util/vector_with_freelist.hh | 89 +++++++++++++
src/core/io_queue.cc | 115 ++++++++++++++++-
src/core/reactor.cc | 28 ++--
src/core/reactor_backend.cc | 13 +-
src/core/reactor_backend.hh | 1 +
tests/unit/CMakeLists.txt | 6 +
tests/unit/io_queue_test.cc | 127 +++++++++++++++++++
tests/unit/vector_with_freelist_test.cc | 109 ++++++++++++++++
13 files changed, 649 insertions(+), 29 deletions(-)
create mode 100644 include/seastar/core/internal/io_sink.hh
create mode 100644 include/seastar/core/io_intent.hh
create mode 100644 include/seastar/util/vector_with_freelist.hh
create mode 100644 tests/unit/io_queue_test.cc
create mode 100644 tests/unit/vector_with_freelist_test.cc

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:10 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
This is a wrapper on top of simple vector which allows for
"releasing" slots in the middle and reuse them later. To
keep track of the released slots the single-linked freelist
of indices if maintained in the slots memory.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/util/vector_with_freelist.hh | 89 +++++++++++++++
tests/unit/CMakeLists.txt | 3 +
tests/unit/vector_with_freelist_test.cc | 109 +++++++++++++++++++
3 files changed, 201 insertions(+)
create mode 100644 include/seastar/util/vector_with_freelist.hh
create mode 100644 tests/unit/vector_with_freelist_test.cc

diff --git a/include/seastar/util/vector_with_freelist.hh b/include/seastar/util/vector_with_freelist.hh
new file mode 100644
index 00000000..17bf0c5d
--- /dev/null
+++ b/include/seastar/util/vector_with_freelist.hh
@@ -0,0 +1,89 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+#pragma once
+
+#include <vector>
+
+namespace seastar {
+
+template <typename T, typename Idx = unsigned int>
+SEASTAR_CONCEPT(
+ requires std::is_nothrow_default_constructible_v<T> && (sizeof(T) >= sizeof(Idx))
+)
+class vector_with_freelist {
+ Idx _free, _used;
+ union value_or_index {
+ T _value;
+ Idx _idx;
+ value_or_index() noexcept {}
+ ~value_or_index() noexcept {}
+ };
+ std::vector<value_or_index> _storage;
+public:
+ vector_with_freelist() noexcept : _free(0), _used(0), _storage() {}
+ vector_with_freelist(vector_with_freelist&&) noexcept = default;
+ vector_with_freelist(const vector_with_freelist&) = delete;
+ ~vector_with_freelist() {
+ assert(_used == 0);
+ }
+
+ Idx alloc() {
+ if (_free < _storage.size()) {
+ Idx ret = _free;
+ _free = _storage[ret]._idx;
+ _used++;
+ return ret;
+ }
+
+ _storage.emplace_back(); // just to allocate the memory
+ _used++;
+ return _free++;
+ }
+
+ void free(Idx i) noexcept {
+ assert(i < _storage.size());
+ _storage[i]._idx = _free;
+ _free = i;
+ _used--;
+ }
+
+ void release(Idx i) noexcept {
+ assert(i < _storage.size());
+ _storage[i]._value.~T();
+ free(i);
+ }
+
+ T& operator[](Idx i) noexcept {
+ assert(i < _storage.size());
+ return _storage[i]._value;
+ }
+
+ const T& operator[](Idx i) const noexcept {
+ assert(i < _storage.size());
+ return _storage[i]._value;
+ }
+
+ size_t size() const noexcept { return _used; }
+ size_t capacity() const noexcept { return _storage.size(); }
+};
+
+} // namespace seastar
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index fd2f9036..c7a4356f 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -410,6 +410,9 @@ seastar_add_test (uname
KIND BOOST
SOURCES uname_test.cc)

+seastar_add_test (vector_with_freelist
+ SOURCES vector_with_freelist_test.cc)
+
function(seastar_add_certgen name)
cmake_parse_arguments(CERT
""
diff --git a/tests/unit/vector_with_freelist_test.cc b/tests/unit/vector_with_freelist_test.cc
new file mode 100644
index 00000000..78ba538c
--- /dev/null
+++ b/tests/unit/vector_with_freelist_test.cc
@@ -0,0 +1,109 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+#include <seastar/testing/test_case.hh>
+
+#include <seastar/util/vector_with_freelist.hh>
+
+using namespace seastar;
+
+SEASTAR_TEST_CASE(test_alloc_and_free) {
+ vector_with_freelist<int, unsigned short> v;
+ unsigned short idx;
+
+ for (int i = 0; i < 8; i++) {
+ idx = v.alloc();
+ v[idx] = i + 10;
+ BOOST_REQUIRE(idx == i);
+ }
+
+ BOOST_REQUIRE(v.capacity() == 8);
+ for (int i = 0; i < 8; i++) {
+ BOOST_REQUIRE(v[i] == i + 10);
+ }
+
+ int rel[] = {2, 7, 5, 4};
+ static constexpr int rs = sizeof(rel) / sizeof(rel[0]);
+
+ auto released = [&] (int idx) {
+ for (int r = 0; r < rs; r++) {
+ if (rel[r] == idx) {
+ return r;
+ }
+ }
+ return rs;
+ };
+
+ for (int r = 0; r < rs; r++) {
+ v.release(rel[r]);
+ }
+
+ BOOST_REQUIRE(v.capacity() == 8);
+ for (int i = 0; i < 8; i++) {
+ if (released(i) == rs) {
+ BOOST_REQUIRE(v[i] == i + 10);
+ }
+ }
+
+ for (int i = 0; i < rs; i++) {
+ idx = v.alloc();
+ v[idx] = i + 20;
+ BOOST_REQUIRE(idx == rel[rs - i - 1]);
+ }
+
+ BOOST_REQUIRE(v.capacity() == 8);
+ for (int i = 0; i < 8; i++) {
+ int ri = released(i);
+ if (ri == rs) {
+ BOOST_REQUIRE(v[i] == i + 10);
+ } else {
+ BOOST_REQUIRE(v[i] == (rs - ri - 1) + 20);
+ }
+ }
+
+ for (int i = 0; i < 8; i++) {
+ v.release(i);
+ }
+
+ BOOST_REQUIRE(v.capacity() == 8);
+
+ for (int i = 0; i < 8; i++) {
+ idx = v.alloc();
+ v[idx] = i + 30;
+ }
+
+ BOOST_REQUIRE(v.capacity() == 8);
+ for (int i = 0; i < 8; i++) {
+ int j;
+ for (j = 0; j < 8; j++) {
+ if (v[j] == i + 30) {
+ break;
+ }
+ }
+ BOOST_REQUIRE(j < 8);
+ }
+
+ for (int i = 0; i < 8; i++) {
+ v.release(i);
+ }
+
+ return make_ready_future<>();
+}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:11 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The aio_storage_context::submit_work takes the reactor's queue of
pending requests and converts the head of it into iocb-s. Then
all the generated iocbs are submitted into the kernel AIO and only
then the pending queue's head is cleaned.

The last step can be made earlier. Once the request is converted
into iocb there's no sence in keeping it in the pending queue.
Doing this earlier helps localizing the pending queue drain code
which in turn helps writing io-flow unit test and, eventually,
io cancellation unit test.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/reactor_backend.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc
index a159dfd2..97f10fb3 100644
--- a/src/core/reactor_backend.cc
+++ b/src/core/reactor_backend.cc
@@ -164,6 +164,7 @@ aio_storage_context::submit_work() {
}
_submission_queue.push_back(&io);
}
+ _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + to_submit);

size_t submitted = 0;
while (to_submit > submitted) {
@@ -179,7 +180,6 @@ aio_storage_context::submit_work() {
did_work = true;
submitted += nr_consumed;
}
- _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + submitted);

if (!_pending_aio_retry.empty()) {
schedule_retry();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:12 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
The goal is to encapsulate the pending io queue into a class
wich doesn't care about aio context iocbs.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/reactor_backend.cc | 30 ++++++++++++++++++++++++------
src/core/reactor_backend.hh | 4 ++++
2 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc
index 97f10fb3..12c12364 100644
--- a/src/core/reactor_backend.cc
+++ b/src/core/reactor_backend.cc
@@ -147,15 +147,33 @@ aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {

extern bool aio_nowait_supported;

+template <typename Fn>
+size_t aio_storage_context::drain_pending_queue(Fn&& fn) {
+ size_t pending = _r->_pending_io.size();
+ size_t drained = 0;
+
+ while ((pending > drained)) {
+ auto& req = _r->_pending_io[drained];
+ if (!fn(req)) {
+ break;
+ }
+ drained++;
+ }
+
+ _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + drained);
+ return drained;
+}
+
bool
aio_storage_context::submit_work() {
- size_t pending = _r->_pending_io.size();
- size_t to_submit = 0;
bool did_work = false;

_submission_queue.resize(0);
- while ((pending > to_submit) && _iocb_pool.has_capacity()) {
- auto& req = _r->_pending_io[to_submit++];
+ size_t to_submit = drain_pending_queue([this] (internal::io_request& req) -> bool {
+ if (!_iocb_pool.has_capacity()) {
+ return false;
+ }
+
auto& io = _iocb_pool.get_one();
prepare_iocb(req, io);

@@ -163,8 +181,8 @@ aio_storage_context::submit_work() {
set_eventfd_notification(io, _r->_aio_eventfd->get_fd());
}
_submission_queue.push_back(&io);
- }
- _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + to_submit);
+ return true;
+ });

size_t submitted = 0;
while (to_submit > submitted) {
diff --git a/src/core/reactor_backend.hh b/src/core/reactor_backend.hh
index 89268cc9..cf707660 100644
--- a/src/core/reactor_backend.hh
+++ b/src/core/reactor_backend.hh
@@ -77,6 +77,10 @@ class aio_storage_context {
using pending_aio_retry_t = boost::container::static_vector<internal::linux_abi::iocb*, max_aio>;
pending_aio_retry_t _pending_aio_retry;
internal::linux_abi::io_event _ev_buffer[max_aio];
+
+ template <typename Fn>
+ size_t drain_pending_queue(Fn&& fn);
+
public:
explicit aio_storage_context(reactor* r);
~aio_storage_context();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:14 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Turn the reactor::_pending_io into a class with two methods:
submit and drain. The former adds a request to the sink, the
latter drains them one by one into what caller wants.

Temporarily keep the reactor::_io_sink public to let io_queue
code submit requests into it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/internal/io_sink.hh | 54 ++++++++++++++++++++++++
include/seastar/core/reactor.hh | 7 ++-
src/core/io_queue.cc | 12 +++++-
src/core/reactor.cc | 12 +-----
src/core/reactor_backend.cc | 19 +--------
src/core/reactor_backend.hh | 3 --
6 files changed, 72 insertions(+), 35 deletions(-)
create mode 100644 include/seastar/core/internal/io_sink.hh

diff --git a/include/seastar/core/internal/io_sink.hh b/include/seastar/core/internal/io_sink.hh
new file mode 100644
index 00000000..92184de0
--- /dev/null
+++ b/include/seastar/core/internal/io_sink.hh
@@ -0,0 +1,54 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2020 ScyllaDB
+ */
+
+#pragma once
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/internal/io_request.hh>
+
+namespace seastar {
+
+class io_completion;
+
+class io_sink {
+ circular_buffer<internal::io_request> _pending_io;
+public:
+ void submit(io_completion* desc, internal::io_request req) noexcept;
+
+ template <typename Fn>
+ size_t drain(Fn&& fn) {
+ size_t pending = _pending_io.size();
+ size_t drained = 0;
+
+ while (pending > drained) {
+ internal::io_request& req = _pending_io[drained];
+
+ if (!fn(req)) {
+ break;
+ }
+ drained++;
+ }
+
+ _pending_io.erase(_pending_io.begin(), _pending_io.begin() + drained);
+ return drained;
+ }
+};
+
+} // namespace seastar
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 22419324..8d11fef1 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -81,6 +81,7 @@
#include <seastar/core/scheduling_specific.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/internal/io_request.hh>
+#include <seastar/core/internal/io_sink.hh>
#include <seastar/core/make_task.hh>
#include "internal/pollable_fd.hh"
#include "internal/poll.hh"
@@ -262,6 +263,10 @@ class reactor {
std::vector<std::unique_ptr<io_queue>> my_io_queues;
std::unordered_map<dev_t, io_queue*> _io_queues;

+public:
+ io_sink _io_sink;
+
+private:
std::vector<noncopyable_function<future<> ()>> _exit_funcs;
unsigned _id = 0;
bool _stopping = false;
@@ -314,7 +319,6 @@ class reactor {
void register_stats();
};

- circular_buffer<internal::io_request> _pending_io;
boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
internal::scheduling_group_specific_thread_local_data _scheduling_group_specific_data;
int64_t _last_vruntime = 0;
@@ -521,7 +525,6 @@ class reactor {
// In the following three methods, prepare_io is not guaranteed to execute in the same processor
// in which it was generated. Therefore, care must be taken to avoid the use of objects that could
// be destroyed within or at exit of prepare_io.
- void submit_io(io_completion* desc, internal::io_request req) noexcept;
future<size_t> submit_io_read(io_queue* ioq,
const io_priority_class& priority_class,
size_t len,
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 4fb4b5b9..4759e84f 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -27,6 +27,7 @@
#include <seastar/core/metrics.hh>
#include <seastar/core/linux-aio.hh>
#include <seastar/core/internal/io_desc.hh>
+#include <seastar/core/internal/io_sink.hh>
#include <seastar/util/log.hh>
#include <chrono>
#include <mutex>
@@ -288,7 +289,7 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
io_log.trace("dev {} : req {} submit", _config.devid, fmt::ptr(&*d));
- engine().submit_io(d.release(), std::move(req));
+ engine()._io_sink.submit(d.release(), std::move(req));
});
pclass.nr_queued++;
_queued_requests++;
@@ -314,4 +315,13 @@ io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
}
}

+void io_sink::submit(io_completion* desc, internal::io_request req) noexcept {
+ req.attach_kernel_completion(desc);
+ try {
+ _pending_io.push_back(std::move(req));
+ } catch (...) {
+ desc->set_exception(std::current_exception());
+ }
+}
+
}
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 11a9125e..43c6953a 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1517,16 +1517,6 @@ void io_completion::complete_with(ssize_t res) {
}
}

-void
-reactor::submit_io(io_completion* desc, io_request req) noexcept {
- req.attach_kernel_completion(desc);
- try {
- _pending_io.push_back(std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
-}
-
bool
reactor::flush_pending_aio() {
for (auto& ioq : my_io_queues) {
@@ -1971,7 +1961,7 @@ reactor::fdatasync(int fd) noexcept {
auto desc = new fsync_io_desc;
auto fut = desc->get_future();
auto req = io_request::make_fdatasync(fd);
- submit_io(desc, std::move(req));
+ _io_sink.submit(desc, std::move(req));
return fut;
});
}
diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc
index 12c12364..a4973e4b 100644
--- a/src/core/reactor_backend.cc
+++ b/src/core/reactor_backend.cc
@@ -147,29 +147,12 @@ aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {

extern bool aio_nowait_supported;

-template <typename Fn>
-size_t aio_storage_context::drain_pending_queue(Fn&& fn) {
- size_t pending = _r->_pending_io.size();
- size_t drained = 0;
-
- while ((pending > drained)) {
- auto& req = _r->_pending_io[drained];
- if (!fn(req)) {
- break;
- }
- drained++;
- }
-
- _r->_pending_io.erase(_r->_pending_io.begin(), _r->_pending_io.begin() + drained);
- return drained;
-}
-
bool
aio_storage_context::submit_work() {
bool did_work = false;

_submission_queue.resize(0);
- size_t to_submit = drain_pending_queue([this] (internal::io_request& req) -> bool {
+ size_t to_submit = _r->_io_sink.drain([this] (internal::io_request& req) -> bool {
if (!_iocb_pool.has_capacity()) {
return false;
}
diff --git a/src/core/reactor_backend.hh b/src/core/reactor_backend.hh
index cf707660..5aadd38f 100644
--- a/src/core/reactor_backend.hh
+++ b/src/core/reactor_backend.hh
@@ -78,9 +78,6 @@ class aio_storage_context {
pending_aio_retry_t _pending_aio_retry;
internal::linux_abi::io_event _ev_buffer[max_aio];

- template <typename Fn>
- size_t drain_pending_queue(Fn&& fn);
-

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:15 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Instead of submitting the requests into sink through reactor,
submit them directly into sink. This is the rerequisite for
io_queue unit tests.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/io_queue.hh | 4 +++-
include/seastar/core/reactor.hh | 3 ---
src/core/io_queue.cc | 5 +++--
src/core/reactor.cc | 2 +-
4 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index dd7c1b1a..1c7f5799 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -58,6 +58,7 @@ struct iocb;
using shard_id = unsigned;

class io_priority_class;
+class io_sink;

class io_queue {
private:
@@ -76,6 +77,7 @@ class io_queue {

std::vector<std::vector<std::unique_ptr<priority_class_data>>> _priority_classes;
fair_queue _fq;
+ io_sink& _io_sink;

static constexpr unsigned _max_classes = 2048;
static std::mutex _register_lock;
@@ -119,7 +121,7 @@ class io_queue {
sstring mountpoint = "undefined";
};

- io_queue(config cfg);
+ io_queue(io_sink& sink, config cfg);
~io_queue();

future<size_t>
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 8d11fef1..adf3c2c7 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -262,11 +262,8 @@ class reactor {
// be stored here.
std::vector<std::unique_ptr<io_queue>> my_io_queues;
std::unordered_map<dev_t, io_queue*> _io_queues;
-
-public:
io_sink _io_sink;

-private:
std::vector<noncopyable_function<future<> ()>> _exit_funcs;
unsigned _id = 0;
bool _stopping = false;
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 4759e84f..bffa61e5 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -88,9 +88,10 @@ fair_queue::config io_queue::make_fair_queue_config(config iocfg) {
return cfg;
}

-io_queue::io_queue(io_queue::config cfg)
+io_queue::io_queue(io_sink& sink, io_queue::config cfg)
: _priority_classes()
, _fq(make_fair_queue_config(cfg))
+ , _io_sink(sink)
, _config(std::move(cfg)) {
}

@@ -289,7 +290,7 @@ io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_re
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
io_log.trace("dev {} : req {} submit", _config.devid, fmt::ptr(&*d));
- engine()._io_sink.submit(d.release(), std::move(req));
+ _io_sink.submit(d.release(), std::move(req));
});
pclass.nr_queued++;
_queued_requests++;
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 43c6953a..8140305b 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -3879,7 +3879,7 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
cfg.coordinator = cid;
assert(vec_idx < all_io_queues[id].size());
assert(!all_io_queues[id][vec_idx]);
- all_io_queues[id][vec_idx] = new io_queue(std::move(cfg));
+ all_io_queues[id][vec_idx] = new io_queue(engine()._io_sink, std::move(cfg));
}
};

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:16 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
For now it just checks that requests go through io_queue
and io_sink pair and get executed.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/CMakeLists.txt | 3 ++
tests/unit/io_queue_test.cc | 67 +++++++++++++++++++++++++++++++++++++
2 files changed, 70 insertions(+)
create mode 100644 tests/unit/io_queue_test.cc

diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index c7a4356f..e42af8c0 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -283,6 +283,9 @@ seastar_add_test (execution_stage
seastar_add_test (expiring_fifo
SOURCES expiring_fifo_test.cc)

+seastar_add_test (io_queue
+ SOURCES io_queue_test.cc)
+
seastar_add_test (fair_queue
SOURCES fair_queue_test.cc)

diff --git a/tests/unit/io_queue_test.cc b/tests/unit/io_queue_test.cc
new file mode 100644
index 00000000..3f3b48c4
--- /dev/null
+++ b/tests/unit/io_queue_test.cc
@@ -0,0 +1,67 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+#include <seastar/core/thread.hh>
+#include <seastar/testing/test_case.hh>
+#include <seastar/testing/thread_test_case.hh>
+#include <seastar/testing/test_runner.hh>
+#include <seastar/core/smp.hh>
+#include <seastar/core/file.hh>
+#include <seastar/core/io_queue.hh>
+#include <seastar/core/internal/io_request.hh>
+#include <seastar/core/internal/io_sink.hh>
+
+using namespace seastar;
+
+template <size_t Len>
+struct fake_file {
+ int data[Len] = {};
+
+ static internal::io_request make_write_req(size_t idx, int val) {
+ int* buf = new int(val);
+ return internal::io_request::make_write(0, idx, buf, 1);
+ }
+
+ void execute_write_req(internal::io_request& rq) {
+ data[rq.pos()] = *(reinterpret_cast<int*>(rq.address()));
+ rq.get_kernel_completion()->complete_with(rq.size());
+ }
+};
+
+SEASTAR_THREAD_TEST_CASE(test_basic_flow) {
+ io_sink sink;
+ io_queue q(sink, io_queue::config{0, this_shard_id()});
+ fake_file<1> file;
+
+ auto f = q.queue_request(default_priority_class(), 0, file.make_write_req(0, 42))
+ .then([&file] (size_t len) {
+ BOOST_REQUIRE(file.data[0] == 42);
+ });
+
+ q.poll_io_queue();
+ sink.drain([&file] (internal::io_request& rq) -> bool {
+ file.execute_write_req(rq);
+ return true;
+ });
+
+ f.get();
+}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:17 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
It is already used like that, but prevent future copies. The request
will keep a non-copyable handle on IO intent.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/internal/io_request.hh | 25 +++++++++++++++++++++
1 file changed, 25 insertions(+)

diff --git a/include/seastar/core/internal/io_request.hh b/include/seastar/core/internal/io_request.hh
index f8d3c9af..1ecae9fc 100644
--- a/include/seastar/core/internal/io_request.hh
+++ b/include/seastar/core/internal/io_request.hh
@@ -120,7 +120,32 @@ class io_request {
{
_ptr.addr = ptr;
}
+
public:
+ io_request(io_request&& other) noexcept
+ : _op(other._op)
+ , _fd(other._fd)
+ , _attr(std::move(other._attr))
+ , _ptr(std::move(other._ptr))
+ , _size(std::move(other._size))
+ , _kernel_completion(other._kernel_completion)
+ {}
+
+ io_request& operator=(io_request&& other) noexcept {
+ if (&other != this) {
+ _op = other._op;
+ _fd = other._fd;
+ _attr = std::move(other._attr);
+ _ptr = std::move(other._ptr);
+ _size = std::move(other._size);
+ _kernel_completion = other._kernel_completion;
+ }
+ return *this;
+ }
+
+ io_request(const io_request&) = delete;
+ io_request& operator=(const io_request&) = delete;
+
bool is_read() const {
switch (_op) {
case operation::read:
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:18 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
By default enum classes are 32bit. The plan is to fill first 8
bytes with operation (16b), intent handle (16b) and fd (32b),
so we need a gap of 16 bits after it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/internal/io_request.hh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/include/seastar/core/internal/io_request.hh b/include/seastar/core/internal/io_request.hh
index 1ecae9fc..36132030 100644
--- a/include/seastar/core/internal/io_request.hh
+++ b/include/seastar/core/internal/io_request.hh
@@ -32,7 +32,7 @@ namespace internal {

class io_request {
public:
- enum class operation { read, readv, write, writev, fdatasync, recv, recvmsg, send, sendmsg, accept, connect, poll_add, poll_remove, cancel };
+ enum class operation : uint16_t { read, readv, write, writev, fdatasync, recv, recvmsg, send, sendmsg, accept, connect, poll_add, poll_remove, cancel };
private:
operation _op;
int _fd;
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:19 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Cancelled requests will resolve the future into exceptional one
with the help of this.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/core/io_queue.cc | 13 +++++++++++++
1 file changed, 13 insertions(+)

diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index bffa61e5..5c4fc632 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -75,6 +75,19 @@ class io_desc_read_write final : public io_completion {
}
};

+class canceled_error : public std::exception {
+public:
+ virtual const char* what() const noexcept {
+ return "canceled";
+ }
+};
+
+struct default_io_exception_factory {
+ static auto canceled() {
+ return canceled_error();
+ }
+};
+
void
io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {
_requests_executing--;
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:22 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
Queue three requests. Cancel the 2nd first, then make the
queue feed requests into sink, then cancel the 3rd. Then
drain the sink.

At the end only the 1st one should pass.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/io_queue_test.cc | 58 +++++++++++++++++++++++++++++++++++++
1 file changed, 58 insertions(+)

diff --git a/tests/unit/io_queue_test.cc b/tests/unit/io_queue_test.cc
index cc6a977b..2d067e91 100644
--- a/tests/unit/io_queue_test.cc
+++ b/tests/unit/io_queue_test.cc
@@ -24,6 +24,7 @@
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/test_runner.hh>
+#include <seastar/core/future-util.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/file.hh>
#include <seastar/core/io_queue.hh>
@@ -67,3 +68,60 @@ SEASTAR_THREAD_TEST_CASE(test_basic_flow) {

f.get();
}
+
+SEASTAR_THREAD_TEST_CASE(test_io_cancellation) {
+ io_sink sink;
+ io_queue q(sink, io_queue::config{0, this_shard_id()});
+ fake_file<3> file;
+ io_intent intent;
+
+ auto f1 = q.queue_request(default_priority_class(), 0, file.make_write_req(0, 42), intent)
+ .then([&file] (size_t len) {
+ BOOST_REQUIRE(file.data[0] == 42);
+ });
+
+ io_intent* cancel = new io_intent();
+
+ auto f2 = q.queue_request(default_priority_class(), 0, file.make_write_req(1, 53), *cancel)
+ .then_wrapped([] (auto&& f) {
+ try {
+ f.get();
+ BOOST_REQUIRE(false);
+ } catch (...) {
+ }
+ return make_ready_future<>();
+ })
+ .then([&file] {
+ BOOST_REQUIRE(file.data[1] == 0);
+ });
+
+ delete cancel;
+
+ cancel = new io_intent();
+
+ auto f3 = q.queue_request(default_priority_class(), 0, file.make_write_req(2, 65), *cancel)
+ .then_wrapped([] (auto&& f) {
+ try {
+ f.get();
+ BOOST_REQUIRE(false);
+ } catch (...) {
+ }
+ return make_ready_future<>();
+ })
+ .then([&file] {
+ BOOST_REQUIRE(file.data[2] == 0);
+ });
+
+ q.poll_io_queue();
+
+ delete cancel;
+
+ size_t drained = sink.drain([&file] (internal::io_request& rq) -> bool {
+ file.execute_write_req(rq);
+ return true;
+ });
+
+ BOOST_REQUIRE(drained == 1);
+
+ when_all_succeed(std::move(f1), std::move(f2), std::move(f3)).get();
+}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 1:58:22 PM12/3/20
to seastar-dev@googlegroups.com, Pavel Emelyanov
To perform cancellable IO one should

1. Create an io_intent object (non-copyable one)
2. Queue a request with this object passed as argument

While the intent is alive all IO goes as it used to. Once the
intent is destroyed, all the requests that were created with
one will be eventually dropped from processing (unless they've
already been submitted to kernel) without doing real IO.

To make this all work requests "pin" the intent when being
queued and in critical IO-submission places they check for the
intent to be still valid. If it's not, the further processing
stops.

Since the intent iself cannot be referenced by requests, a per
shard store of request descriptors is maintained and both --
requests and the intents themselves -- hold handlers to the
descriptors.

An intent activates an "owned" handler in the store when being
pinned for the first time. All requests that pin the intent do
it with the help of reference counter on intent's descriptor.

When the intent dies it deactivates the descriptor by clearing
the "owned" mark. Once the descriptor loses all users (i.e.
the reference counter and owner become 0) it's released and
becomes available for activation by another intent.

Not to bloat the memory with descriptors and handles both are
16-bit values. The highest bit of the descriptor is the "owned"
mark, the remainder is the reference counter. This gives 64k
descriptors pinned by 32k requests each. The "unused" descriptor
is described by value of 0 which is also qute handy.

The existing file's API push the perpetual intent into IO queue,
it's never destroyed, so the "legacy" IO works as before.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/internal/io_request.hh | 17 ++++
include/seastar/core/internal/io_sink.hh | 14 +++-
include/seastar/core/io_intent.hh | 68 ++++++++++++++++
include/seastar/core/io_queue.hh | 5 +-
include/seastar/core/reactor.hh | 5 ++
src/core/io_queue.cc | 87 ++++++++++++++++++++-
src/core/reactor.cc | 14 +++-
tests/unit/io_queue_test.cc | 4 +-
8 files changed, 204 insertions(+), 10 deletions(-)
create mode 100644 include/seastar/core/io_intent.hh

diff --git a/include/seastar/core/internal/io_request.hh b/include/seastar/core/internal/io_request.hh
index 36132030..e29f4fa2 100644
--- a/include/seastar/core/internal/io_request.hh
+++ b/include/seastar/core/internal/io_request.hh
@@ -24,6 +24,7 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/linux-aio.hh>
#include <seastar/core/internal/io_desc.hh>
+#include <seastar/core/io_intent.hh>
#include <sys/types.h>
#include <sys/socket.h>

@@ -35,6 +36,7 @@ class io_request {
enum class operation : uint16_t { read, readv, write, writev, fdatasync, recv, recvmsg, send, sendmsg, accept, connect, poll_add, poll_remove, cancel };
private:
operation _op;
+ io_intent::handle _intent;
int _fd;
union {
uint64_t pos;
@@ -63,6 +65,7 @@ class io_request {

explicit io_request(operation op, int fd, int flags, ::msghdr* msg)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.flags = flags;
@@ -71,6 +74,7 @@ class io_request {

explicit io_request(operation op, int fd, sockaddr* sa, socklen_t sl)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_ptr.sockaddr = sa;
@@ -79,6 +83,7 @@ class io_request {

explicit io_request(operation op, int fd, int flags, sockaddr* sa, socklen_t* sl)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.flags = flags;
@@ -87,6 +92,7 @@ class io_request {
}
explicit io_request(operation op, int fd, uint64_t pos, char* ptr, size_t size)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.pos = pos;
@@ -96,6 +102,7 @@ class io_request {

explicit io_request(operation op, int fd, uint64_t pos, iovec* ptr, size_t size)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.pos = pos;
@@ -105,10 +112,12 @@ class io_request {

explicit io_request(operation op, int fd)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{}
explicit io_request(operation op, int fd, int events)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_attr.events = events;
@@ -116,6 +125,7 @@ class io_request {

explicit io_request(operation op, int fd, char *ptr)
: _op(op)
+ , _intent(io_intent::no_handle)
, _fd(fd)
{
_ptr.addr = ptr;
@@ -124,6 +134,7 @@ class io_request {
public:
io_request(io_request&& other) noexcept
: _op(other._op)
+ , _intent(std::exchange(other._intent, io_intent::no_handle))
, _fd(other._fd)
, _attr(std::move(other._attr))
, _ptr(std::move(other._ptr))
@@ -134,6 +145,7 @@ class io_request {
io_request& operator=(io_request&& other) noexcept {
if (&other != this) {
_op = other._op;
+ _intent = std::exchange(other._intent, io_intent::no_handle);
_fd = other._fd;
_attr = std::move(other._attr);
_ptr = std::move(other._ptr);
@@ -146,6 +158,11 @@ class io_request {
io_request(const io_request&) = delete;
io_request& operator=(const io_request&) = delete;

+ ~io_request();
+
+ io_intent::handle intent() const noexcept { return _intent; }
+ void set_intent(io_intent::handle h) noexcept { _intent = h; }
+
bool is_read() const {
switch (_op) {
case operation::read:
diff --git a/include/seastar/core/internal/io_sink.hh b/include/seastar/core/internal/io_sink.hh
index 92184de0..41ca37e4 100644
--- a/include/seastar/core/internal/io_sink.hh
+++ b/include/seastar/core/internal/io_sink.hh
@@ -26,28 +26,34 @@
namespace seastar {

class io_completion;
+namespace internal { class io_request; }

class io_sink {
circular_buffer<internal::io_request> _pending_io;
+ bool check_request_intent(internal::io_request& req) noexcept;
public:
void submit(io_completion* desc, internal::io_request req) noexcept;

template <typename Fn>
size_t drain(Fn&& fn) {
size_t pending = _pending_io.size();
- size_t drained = 0;
+ size_t drained = 0, skipped = 0;

while (pending > drained) {
internal::io_request& req = _pending_io[drained];

- if (!fn(req)) {
- break;
+ if (!check_request_intent(req)) {
+ skipped++;
+ } else {
+ if (!fn(req)) {
+ break;
+ }
}
drained++;
}

_pending_io.erase(_pending_io.begin(), _pending_io.begin() + drained);
- return drained;
+ return drained - skipped;
}
};

diff --git a/include/seastar/core/io_intent.hh b/include/seastar/core/io_intent.hh
new file mode 100644
index 00000000..154d7fa8
--- /dev/null
+++ b/include/seastar/core/io_intent.hh
@@ -0,0 +1,68 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2020 ScyllaDB
+ */
+
+#pragma once
+#include <seastar/util/vector_with_freelist.hh>
+
+namespace seastar {
+
+class io_intent_store;
+
+class io_intent {
+ friend class io_intent_store;
+
+public:
+ using handle = uint16_t;
+ static constexpr handle no_handle = 0xFFFF;
+ static constexpr handle max_handle = 0x7FFF;
+
+ io_intent() noexcept = default;
+ io_intent(io_intent&& other) noexcept
+ : _h(std::exchange(other._h, no_handle))
+ , _coordinator(std::exchange(other._coordinator, -1))
+ {}
+ io_intent(const io_intent&) = delete;
+ ~io_intent();
+
+ static bool check_handle(handle h) noexcept;
+
+private:
+ handle _h = no_handle;
+ int _coordinator = -1;
+};
+
+class io_intent_store {
+ using descriptor = uint16_t;
+ static constexpr descriptor owned_desc = 0x8000;
+ static constexpr descriptor unused_desc = 0x0000;
+
+ vector_with_freelist<descriptor, io_intent::handle> _descs;
+ void maybe_release(io_intent::handle h) noexcept;
+
+public:
+ io_intent_store() noexcept = default;
+ io_intent::handle pin(io_intent& intent);
+ void unpin(io_intent::handle h) noexcept;
+ void deactivate(io_intent::handle h) noexcept;
+ bool check(io_intent::handle h) const noexcept;
+};
+
+} // namespace seastar
diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh
index 1c7f5799..f12d455e 100644
--- a/include/seastar/core/io_queue.hh
+++ b/include/seastar/core/io_queue.hh
@@ -32,6 +32,7 @@
namespace seastar {

class io_priority_class;
+class io_intent;

/// Renames an io priority class
///
@@ -125,7 +126,7 @@ class io_queue {
~io_queue();

future<size_t>
- queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept;
+ queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent& intent) noexcept;

[[deprecated("modern I/O queues should use a property file")]] size_t capacity() const {
return _config.capacity;
@@ -167,6 +168,8 @@ class io_queue {
private:
config _config;
static fair_queue::config make_fair_queue_config(config cfg);
+
+ void note_request_intent(internal::io_request& req, io_intent& intent);
};

}
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index adf3c2c7..871be674 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -83,6 +83,7 @@
#include <seastar/core/internal/io_request.hh>
#include <seastar/core/internal/io_sink.hh>
#include <seastar/core/make_task.hh>
+#include <seastar/core/io_intent.hh>
#include "internal/pollable_fd.hh"
#include "internal/poll.hh"

@@ -263,6 +264,7 @@ class reactor {
std::vector<std::unique_ptr<io_queue>> my_io_queues;
std::unordered_map<dev_t, io_queue*> _io_queues;
io_sink _io_sink;
+ io_intent_store _io_intents;

std::vector<noncopyable_function<future<> ()>> _exit_funcs;
unsigned _id = 0;
@@ -469,6 +471,9 @@ class reactor {
}
}

+ io_intent_store& io_intents() noexcept { return _io_intents; }
+ const io_intent_store& io_intents() const noexcept { return _io_intents; }
+
io_priority_class register_one_priority_class(sstring name, uint32_t shares);

/// \brief Updates the current amount of shares for a given priority class
diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc
index 5c4fc632..f49a103e 100644
--- a/src/core/io_queue.cc
+++ b/src/core/io_queue.cc
@@ -26,6 +26,7 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/linux-aio.hh>
+#include <seastar/core/io_intent.hh>
#include <seastar/core/internal/io_desc.hh>
#include <seastar/core/internal/io_sink.hh>
#include <seastar/util/log.hh>
@@ -284,21 +285,32 @@ fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, s
return fair_queue_ticket(weight, size);
}

+
+void io_queue::note_request_intent(internal::io_request& req, io_intent& intent) {
+ req.set_intent(engine().io_intents().pin(intent));
+}
+
future<size_t>
-io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
+io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent& intent) noexcept {
auto start = std::chrono::steady_clock::now();
- return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
+ return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this, &intent] () mutable {
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
+ note_request_intent(req, intent);
io_log.trace("dev {} : req {} queue len {} ticket {}", _config.devid, fmt::ptr(&*desc), len, fq_ticket);
_fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
_queued_requests--;
_requests_executing++;
pclass.nr_queued--;
+ if (!io_intent::check_handle(req.intent())) {
+ d->set_exception(std::make_exception_ptr(default_io_exception_factory::canceled()));
+ return;
+ }
+
pclass.ops++;
pclass.bytes += len;
pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
@@ -329,6 +341,17 @@ io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
}
}

+bool io_sink::check_request_intent(internal::io_request& req) noexcept {
+ io_intent::handle h = req.intent();
+ if (h == io_intent::no_handle || io_intent::check_handle(h)) {
+ return true;
+ }
+
+ io_completion* d = dynamic_cast<io_completion*>(req.get_kernel_completion());
+ d->set_exception(std::make_exception_ptr(default_io_exception_factory::canceled()));
+ return false;
+}
+
void io_sink::submit(io_completion* desc, internal::io_request req) noexcept {
req.attach_kernel_completion(desc);
try {
@@ -338,4 +361,64 @@ void io_sink::submit(io_completion* desc, internal::io_request req) noexcept {
}
}

+io_intent::handle io_intent_store::pin(io_intent& intent) {
+ io_intent::handle h = intent._h;
+
+ if (h == io_intent::no_handle) {
+ assert(intent._coordinator == -1);
+ h = _descs.alloc();
+ assert(h < io_intent::max_handle);
+ _descs[h] = owned_desc;
+ intent._h = h;
+ intent._coordinator = this_shard_id();
+ }
+
+ _descs[h]++;
+ return h;
+}
+
+void io_intent_store::maybe_release(io_intent::handle h) noexcept {
+ if (_descs[h] == unused_desc) {
+ _descs.release(h);
+ }
+}
+
+void io_intent_store::unpin(io_intent::handle h) noexcept {
+ _descs[h]--;
+ maybe_release(h);
+}
+
+void io_intent_store::deactivate(io_intent::handle h) noexcept {
+ assert(_descs[h] & owned_desc);
+ _descs[h] &= ~owned_desc;
+ maybe_release(h);
+}
+
+bool io_intent_store::check(io_intent::handle h) const noexcept {
+ assert(h != io_intent::no_handle);
+ return _descs[h] & owned_desc;
+}
+
+
+bool io_intent::check_handle(handle h) noexcept {
+ return engine().io_intents().check(h);
+}
+
+io_intent::~io_intent() {
+ if (_h != io_intent::no_handle) {
+ assert(_coordinator != -1);
+ (void)smp::submit_to(_coordinator, [h = _h] () noexcept {
+ engine().io_intents().deactivate(h);
+ });
+ }
+}
+
+namespace internal {
+io_request::~io_request() {
+ if (_intent != io_intent::no_handle) {
+ engine().io_intents().unpin(_intent);
+ }
+}
+}
+
}
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 8140305b..927f432a 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -50,6 +50,7 @@
#include <seastar/core/thread_cputime_clock.hh>
#include <seastar/core/abort_on_ebadf.hh>
#include <seastar/core/io_queue.hh>
+#include <seastar/core/io_intent.hh>
#include <seastar/core/internal/io_desc.hh>
#include <seastar/core/internal/buffer_allocator.hh>
#include <seastar/core/scheduling_specific.hh>
@@ -920,6 +921,8 @@ reactor::reactor(unsigned id, reactor_backend_selector rbs, reactor_config cfg)
});
}

+static thread_local io_intent perpetual_intent = io_intent();
+
reactor::~reactor() {
sigset_t mask;
sigemptyset(&mask);
@@ -956,6 +959,13 @@ reactor::~reactor() {
}
}
}
+
+ // This is to make x destroy and remove its intent
+ // from the intent store. The same cannot be done
+ // via perpetual_intent destructor itself, as it
+ // will be destroyed _after_ reactor and the intent
+ // store will be already free by that time
+ io_intent x(std::move(perpetual_intent));
}

future<> reactor::readable(pollable_fd_state& fd) {
@@ -1541,14 +1551,14 @@ future<size_t>
reactor::submit_io_read(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
++_io_stats.aio_reads;
_io_stats.aio_read_bytes += len;
- return ioq->queue_request(pc, len, std::move(req));
+ return ioq->queue_request(pc, len, std::move(req), perpetual_intent);
}

future<size_t>
reactor::submit_io_write(io_queue* ioq, const io_priority_class& pc, size_t len, io_request req) noexcept {
++_io_stats.aio_writes;
_io_stats.aio_write_bytes += len;
- return ioq->queue_request(pc, len, std::move(req));
+ return ioq->queue_request(pc, len, std::move(req), perpetual_intent);
}

namespace internal {
diff --git a/tests/unit/io_queue_test.cc b/tests/unit/io_queue_test.cc
index 3f3b48c4..cc6a977b 100644
--- a/tests/unit/io_queue_test.cc
+++ b/tests/unit/io_queue_test.cc
@@ -27,6 +27,7 @@
#include <seastar/core/smp.hh>
#include <seastar/core/file.hh>
#include <seastar/core/io_queue.hh>
+#include <seastar/core/io_intent.hh>
#include <seastar/core/internal/io_request.hh>
#include <seastar/core/internal/io_sink.hh>

@@ -51,8 +52,9 @@ SEASTAR_THREAD_TEST_CASE(test_basic_flow) {
io_sink sink;
io_queue q(sink, io_queue::config{0, this_shard_id()});
fake_file<1> file;
+ io_intent intent;

- auto f = q.queue_request(default_priority_class(), 0, file.make_write_req(0, 42))
+ auto f = q.queue_request(default_priority_class(), 0, file.make_write_req(0, 42), intent)
.then([&file] (size_t len) {
BOOST_REQUIRE(file.data[0] == 42);
});
--
2.20.1

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Dec 3, 2020, 2:16:27 PM12/3/20
to Pavel Emelyanov, seastar-dev
we can't do that this way because of aio error handling, take a look
at handle_aio_error(). moving the erase call there could incorrectly
remove a request that failed with EAGAIN, for example. if you really
need to erase the io requests earlier, then you probably have to
reinsert the ones which failed with EAGAIN.

>
> if (!_pending_aio_retry.empty()) {
> schedule_retry();
> --
> 2.20.1
>
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20201203185819.2537-3-xemul%40scylladb.com.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 3, 2020, 3:07:23 PM12/3/20
to Raphael S. Carvalho, seastar-dev
Why? I remove the io_request from _pending_io queue, the respective
iocb remains in its place in _submission_queue and will be submitted
again on the very next loop iteration.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Dec 3, 2020, 3:53:21 PM12/3/20
to Pavel Emelyanov, seastar-dev
oops. looks like I am no longer familiar with this code. you're right,
we can expect to_submit to equal submitted by the end of the
iteration, so it doesn't matter if we erase from _pending_io after or
before the loop starts.

Piotr Sarna

<sarna@scylladb.com>
unread,
Dec 4, 2020, 7:46:33 AM12/4/20
to Pavel Emelyanov, seastar-dev@googlegroups.com

I'm not deeply familiar with the design discussions of this feature, so maybe it was already answered somewhere, but I'll just ask here. This perpetual intent sounds like it adds quite a bit of overhead for each legacy operation. Since it looks like it's only called from two places, maybe it makes sense to provide a intent-less version of queue_request(), used by these two?

E.g. instead of

io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent& intent) noexcept {

You could make a legacy flavor of it:

template<bool UseIntents = true>
io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent& intent) noexcept {

, and wrap the intent-related code in `if constexpr (UseIntents) {}` blocks. That would remove the needless back-and-forth of bumping and checking the reference count of a dummy intent which is already known to have static lifetime. And then, these two calls which need legacy behaviour would simply call `ioq->queue_request<false>()` instead of `ioq->queue_request()`. Of course, instead of a bool parameter it will probably look prettier to use a named tag, I used bool for the sake of an example. That's all under the assumption that we want to squeeze as much as possible from the reactor code, so every branch counts.

Also, I left a small nitpick wrt. a dynamic cast a couple of hundred lines down.

If you're assuming that this cast is always correct (judging by the line below), it's better to use static_cast and avoid needless runtime checks.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 4, 2020, 8:26:13 AM12/4/20
to Piotr Sarna, seastar-dev@googlegroups.com
There's the same thing in io_sink code and there's no way to find out if the
submission was with perpetual intent or no.

> That would remove the needless back-and-forth of bumping and checking the reference count of a dummy intent which is already known to have static lifetime.

I wanted to do the other way -- pin the intent with request _before_ calling
the queue_request(). The checker would see that the request's handle is missing
and just pass through. Request destructor already checks the handle anyway, so
"legacy" requests only have one 'if' overhead.

But this can only be done after IO groups patchset, because pinning (if it
happens), unpinning and checking must be on the same shard, and with current
io-shards design it's not so.

> And then, these two calls which need legacy behaviour would simply call `ioq->queue_request<false>()` instead of `ioq->queue_request()`. Of course, instead of a bool parameter it will probably look prettier to use a named tag, I used bool for the sake of an example. That's all under the assumption that we want to squeeze as much as possible from the reactor code, so every branch counts.
>
> Also, I left a small nitpick wrt. a dynamic cast a couple of hundred lines down.

>> +bool io_sink::check_request_intent(internal::io_request& req) noexcept {
>> + io_intent::handle h = req.intent();
>> + if (h == io_intent::no_handle || io_intent::check_handle(h)) {
>> + return true;
>> + }
>> +
>> + io_completion* d = dynamic_cast<io_completion*>(req.get_kernel_completion());
>
> If you're assuming that this cast is always correct (judging by the line below), it's better to use static_cast and avoid needless runtime checks.

Agree. Will fix in v2.

-- Pavel

Piotr Sarna

<sarna@scylladb.com>
unread,
Dec 4, 2020, 8:33:58 AM12/4/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
Ok, if that's the case then my suggestion makes no sense indeed.
>
>> That would remove the needless back-and-forth of bumping and checking
>> the reference count of a dummy intent which is already known to have
>> static lifetime.
>
> I wanted to do the other way -- pin the intent with request _before_
> calling
> the queue_request(). The checker would see that the request's handle
> is missing
> and just pass through. Request destructor already checks the handle
> anyway, so
> "legacy" requests only have one 'if' overhead.
>
> But this can only be done after IO groups patchset, because pinning
> (if it
> happens), unpinning and checking must be on the same shard, and with
> current
> io-shards design it's not so.
In that case, let's add a short TODO entry somewhere that describes the
idea. And we can drop it when the infrastructure is ready.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 6, 2020, 6:39:29 AM12/6/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
How will the vector know how to move value_or_index?


I think the reason this compiles is that we build without
-Wdeprecated-copy -Wdeprecated-copy-dtor. But it shouldn't really, since
T and Idx have different move/copy operations.


> +public:
> + vector_with_freelist() noexcept : _free(0), _used(0), _storage() {}
> + vector_with_freelist(vector_with_freelist&&) noexcept = default;
> + vector_with_freelist(const vector_with_freelist&) = delete;
> + ~vector_with_freelist() {
> + assert(_used == 0);
> + }
> +
> + Idx alloc() {
> + if (_free < _storage.size()) {
> + Idx ret = _free;
> + _free = _storage[ret]._idx;


Pedantically, you should invoke Idx::~Idx() on _storage[ret], since it
is no longer used to store an Idx, and will be soon used to store a T.


> + _used++;
> + return ret;
> + }
> +
> + _storage.emplace_back(); // just to allocate the memory
> + _used++;
> + return _free++;
> + }
> +
> + void free(Idx i) noexcept {
> + assert(i < _storage.size());
> + _storage[i]._idx = _free;


Pedantically, you would invoke the constructor here, to transform the
untyped _storage[i] into an Idx: new (&_storage[idx]) Idx(i). (C++20 has
the nicer construct_at()).
I imagine this will not work if T has a custom move constructor (it
won't be called).

Avi Kivity

<avi@scylladb.com>
unread,
Dec 6, 2020, 6:45:04 AM12/6/20
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 03/12/2020 20.58, Pavel Emelyanov wrote:
> The goal is to encapsulate the pending io queue into a class
> wich doesn't care about aio context iocbs.
>
> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
> ---
> src/core/reactor_backend.cc | 30 ++++++++++++++++++++++++------
> src/core/reactor_backend.hh | 4 ++++
> 2 files changed, 28 insertions(+), 6 deletions(-)
>
> diff --git a/src/core/reactor_backend.cc b/src/core/reactor_backend.cc
> index 97f10fb3..12c12364 100644
> --- a/src/core/reactor_backend.cc
> +++ b/src/core/reactor_backend.cc
> @@ -147,15 +147,33 @@ aio_storage_context::handle_aio_error(linux_abi::iocb* iocb, int ec) {
>
> extern bool aio_nowait_supported;
>
> +template <typename Fn>
> +size_t aio_storage_context::drain_pending_queue(Fn&& fn) {


Please constrain Fn (with a SEASTAR_CONCEPT) for documentation purposes.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 6, 2020, 6:51:52 AM12/6/20
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 03/12/2020 20.58, Pavel Emelyanov wrote:
Please put this into the internal namespace, so users aren't tempted to
use it.


> + circular_buffer<internal::io_request> _pending_io;
> +public:
> + void submit(io_completion* desc, internal::io_request req) noexcept;
> +
> + template <typename Fn>
> + size_t drain(Fn&& fn) {


Constraint to document (also explain the bool result, since nobody can
guess what it means). I'd call it drain_into() to hint that data goes
into the lambda supplied, not an intrinsic destination known to io_sink.


Edit: I see you inherited the lack of constraint and name from the
existing code. Still, it would be nice to improve it a little.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 6, 2020, 6:53:46 AM12/6/20
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 03/12/2020 20.58, Pavel Emelyanov wrote:
> It is already used like that, but prevent future copies. The request
> will keep a non-copyable handle on IO intent.
>
> Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
> ---
> include/seastar/core/internal/io_request.hh | 25 +++++++++++++++++++++
> 1 file changed, 25 insertions(+)
>
> diff --git a/include/seastar/core/internal/io_request.hh b/include/seastar/core/internal/io_request.hh
> index f8d3c9af..1ecae9fc 100644
> --- a/include/seastar/core/internal/io_request.hh
> +++ b/include/seastar/core/internal/io_request.hh
> @@ -120,7 +120,32 @@ class io_request {
> {
> _ptr.addr = ptr;
> }
> +
> public:
> + io_request(io_request&& other) noexcept
> + : _op(other._op)
> + , _fd(other._fd)
> + , _attr(std::move(other._attr))
> + , _ptr(std::move(other._ptr))
> + , _size(std::move(other._size))
> + , _kernel_completion(other._kernel_completion)
> + {}


Using "= default" would work in this simple case.


> +
> + io_request& operator=(io_request&& other) noexcept {
> + if (&other != this) {
> + _op = other._op;
> + _fd = other._fd;
> + _attr = std::move(other._attr);
> + _ptr = std::move(other._ptr);
> + _size = std::move(other._size);
> + _kernel_completion = other._kernel_completion;
> + }
> + return *this;
> + }
> +


Here too.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 6, 2020, 7:25:56 AM12/6/20
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 03/12/2020 20.58, Pavel Emelyanov wrote:
> To perform cancellable IO one should
>
> 1. Create an io_intent object (non-copyable one)
> 2. Queue a request with this object passed as argument
>
> While the intent is alive all IO goes as it used to. Once the
> intent is destroyed, all the requests that were created with
> one will be eventually dropped from processing (unless they've
> already been submitted to kernel) without doing real IO.
>
> To make this all work requests "pin" the intent when being
> queued and in critical IO-submission places they check for the
> intent to be still valid. If it's not, the further processing
> stops.


This passive (from the point of view of the intent) way has a downside -
we can't push the change of the intent state across an API boundary.


There are two:

 - user/kernel: we can't call io_cancel (or equivalent io_uring) when
an intent dies

 - shard/shard: we can't tell the other shard the intent died (unless
we use std::atomic, legitimate here)


The first is not very important, io_cancel() is very bad, and what's
more important we always submit too few requests to the kernel for
queues to form, so cancellation will do nothing.


The second is also not very important, considering the other patchset.



Actually there are more: if a request waits in a long queue, and we
remove it actively from the queue instead of waiting for it to drain
away, then we reclaim the memory it holds immediately. This can be
useful if we cancel a request in order to reclaim memory.
Perhaps: enum class handle : uint16_t {};


This prevents accidental conversion of integers to handles and back (but
makes the intentional conversion harder).


Moving it to namespace scope (io_intent_handle) will make it easier to
forward-declare, don't know if that's useful. Can alias it here if needed.
Let's rename it in a way that explains what true/false mean, e.g.
was_cancelled().

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 7, 2020, 3:37:24 AM12/7/20
to Avi Kivity, seastar-dev@googlegroups.com
Yes, but it's a preparation for next patch, which adds a non-trivial move
of a new field to this list.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 7, 2020, 3:39:34 AM12/7/20
to Avi Kivity, seastar-dev@googlegroups.com
Yeah, as you've mentioned below if the T has non-trivial move-constructor
it won't be called. Actually it's impossible to know from the _storage[i]
itself if it's T of Idx.

So I need to constraint this with "is_trivial_move_constructible" or alike.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 7, 2020, 3:50:52 AM12/7/20
to Avi Kivity, seastar-dev@googlegroups.com


On 06.12.2020 15:25, Avi Kivity wrote:
>
> On 03/12/2020 20.58, Pavel Emelyanov wrote:
>> To perform cancellable IO one should
>>
>> 1. Create an io_intent object (non-copyable one)
>> 2. Queue a request with this object passed as argument
>>
>> While the intent is alive all IO goes as it used to. Once the
>> intent is destroyed, all the requests that were created with
>> one will be eventually dropped from processing (unless they've
>> already been submitted to kernel) without doing real IO.
>>
>> To make this all work requests "pin" the intent when being
>> queued and in critical IO-submission places they check for the
>> intent to be still valid. If it's not, the further processing
>> stops.
>
>
> This passive (from the point of view of the intent) way has a downside - we can't push the change of the intent state across an API boundary.
>
>
> There are two:
>
>  - user/kernel: we can't call io_cancel (or equivalent io_uring) when an intent dies

We can't call, that's true, but if we could, it would still look like submitting something
into reactor and waiting for the relevant IO-poller (dedicated or existing) to do the
io_cancel() and resolve the promise. But I din't think that anybody really needs to wait
for the cancellation to complete, so it will be reduced to submitting the cancellation and
letting the poller to pick it up some time in the future. This would work even in passive
design.

>  - shard/shard: we can't tell the other shard the intent died (unless we use std::atomic, legitimate here)

This will go away with IO-groups.

> The first is not very important, io_cancel() is very bad, and what's more important we always submit too few requests to the kernel for queues to form, so cancellation will do nothing.
>
>
> The second is also not very important, considering the other patchset.
>
>
>
> Actually there are more: if a request waits in a long queue, and we remove it actively from the queue instead of waiting for it to drain away, then we reclaim the memory it holds immediately. This can be useful if we cancel a request in order to reclaim memory.

Even though it's impossible now (circular buffer doesn't allow for such intrusion), what's
wrong with removing a request from a queue in the intent destructor?
OK

Avi Kivity

<avi@scylladb.com>
unread,
Dec 7, 2020, 8:15:24 AM12/7/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
Yes, it will work with trivial T.


Avi Kivity

<avi@scylladb.com>
unread,
Dec 7, 2020, 8:21:51 AM12/7/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
How will it work? Nobody will check if the request is alive, since we
handed it to the kernel.



>
>>   - shard/shard: we can't tell the other shard the intent died
>> (unless we use std::atomic, legitimate here)
>
> This will go away with IO-groups.
>
>> The first is not very important, io_cancel() is very bad, and what's
>> more important we always submit too few requests to the kernel for
>> queues to form, so cancellation will do nothing.
>>
>>
>> The second is also not very important, considering the other patchset.
>>
>>
>>
>> Actually there are more: if a request waits in a long queue, and we
>> remove it actively from the queue instead of waiting for it to drain
>> away, then we reclaim the memory it holds immediately. This can be
>> useful if we cancel a request in order to reclaim memory.
>
> Even though it's impossible now (circular buffer doesn't allow for
> such intrusion), what's
> wrong with removing a request from a queue in the intent destructor?


It's not wrong, but it requires an active design, no?


btw, a simple use case for intents is read-ahead in
file_data_source_impl. When the stream is closed, we can try to cancel
outstanding read-aheads.


A more complicated one is to pass intents through
file_input_stream_options, so we can cancel non-readaheads too.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 7, 2020, 10:33:25 AM12/7/20
to Avi Kivity, seastar-dev@googlegroups.com
True, but while it in the kenrel it still has some "id" in the seastar (nowadays
this is io_desc which serves as user_data on iocb) using which it can be canceled.

So to make it work the aio ctx would have to

- bump the reference on the intent descriptor, as the io_request is destroyed after io_submit
- keep the handle:iocb (or whatever will be used to point the request in the kernel
to cancel) set of pairs. Since we submit few requests into kernel, it's not going to be big
- when polled/kicked check the vector and push the bunch of cancels in the kernel

>>>   - shard/shard: we can't tell the other shard the intent died (unless we use std::atomic, legitimate here)
>>
>> This will go away with IO-groups.
>>
>>> The first is not very important, io_cancel() is very bad, and what's more important we always submit too few requests to the kernel for queues to form, so cancellation will do nothing.
>>>
>>>
>>> The second is also not very important, considering the other patchset.
>>>
>>>
>>>
>>> Actually there are more: if a request waits in a long queue, and we remove it actively from the queue instead of waiting for it to drain away, then we reclaim the memory it holds immediately. This can be useful if we cancel a request in order to reclaim memory.
>>
>> Even though it's impossible now (circular buffer doesn't allow for such intrusion), what's
>> wrong with removing a request from a queue in the intent destructor?
>
>
> It's not wrong, but it requires an active design, no?

From my perspective the task "given the intent at hands, go and release all the requests pinning
it ASAP" calls for the rework of the collection used to keep requests in the first place. Once
such a collection is ready, we can either call the removal from intent destructor, or, if it's
for any reason turns out to be a sleeping operation, intruduce a future<> intent::cancel_gently()
that will have to be called explicitly.

I can do it now, but it will mean walking the whole circular_buffer and completing requests with
exceptions.

Avi Kivity

<avi@scylladb.com>
unread,
Dec 7, 2020, 11:22:32 AM12/7/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
This means that every poll() we check every active request. With the
active approach we need do nothing, it's the canceled request that pays
the price.
We're talking about priority_class::_queue, yes?


I think we do want it, given the intent is to kill a memory hog. I guess
we can keep the request in the queue, but zero the ticket and func. This
avoids shuffling around data.


We'll need to add a parameter to func to tell it whether we want to
execute or cancel.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 7, 2020, 12:57:02 PM12/7/20
to Avi Kivity, seastar-dev@googlegroups.com
Every request to be canceled. I'm not talking about keeping "canceled" bit on
objects from _submission_queue.

> With the active approach we need do nothing, it's the canceled request that pays the price.

But it pays one by either doing a thread-pool call or by still filling some
data for the poller. The same can be done with current design, the only
difference I see is that in destructor it's impossible to wait for the
operation to finish. To be waitable there should be a method, but destructor
won't go away, it will have to deactivate the descriptor in case that method
was somewhy not called.

>>>>>   - shard/shard: we can't tell the other shard the intent died (unless we use std::atomic, legitimate here)
>>>>
>>>> This will go away with IO-groups.
>>>>
>>>>> The first is not very important, io_cancel() is very bad, and what's more important we always submit too few requests to the kernel for queues to form, so cancellation will do nothing.
>>>>>
>>>>>
>>>>> The second is also not very important, considering the other patchset.
>>>>>
>>>>>
>>>>>
>>>>> Actually there are more: if a request waits in a long queue, and we remove it actively from the queue instead of waiting for it to drain away, then we reclaim the memory it holds immediately. This can be useful if we cancel a request in order to reclaim memory.
>>>>
>>>> Even though it's impossible now (circular buffer doesn't allow for such intrusion), what's
>>>> wrong with removing a request from a queue in the intent destructor?
>>>
>>>
>>> It's not wrong, but it requires an active design, no?
>>
>> From my perspective the task "given the intent at hands, go and release all the requests pinning
>> it ASAP" calls for the rework of the collection used to keep requests in the first place. Once
>> such a collection is ready, we can either call the removal from intent destructor, or, if it's
>> for any reason turns out to be a sleeping operation, intruduce a future<> intent::cancel_gently()
>> that will have to be called explicitly.
>>
>> I can do it now, but it will mean walking the whole circular_buffer and completing requests with
>> exceptions.
>>
>
> We're talking about priority_class::_queue, yes?

And the io_sink::_pending_io, yes.

> I think we do want it, given the intent is to kill a memory hog.

Isn't the primary intent not to do IO for time-outed client requests?
Anyway, I can include this in v2 and check how to make the _queue/_pending_io
a bit better.

> I guess we can keep the request in the queue, but zero the ticket and func. This avoids shuffling around data.

Would it be ... blameworthy to replace the func with a dummy one durng cancellation?

Avi Kivity

<avi@scylladb.com>
unread,
Dec 7, 2020, 1:22:38 PM12/7/20
to Pavel Emelyanov, seastar-dev@googlegroups.com
Then that's the active design, no? If the io_intent destructor triggers
some action, then that's fine, and if no request is cancelled (common
case) nothing is done.


>
>> With the active approach we need do nothing, it's the canceled
>> request that pays the price.
>
> But it pays one by either doing a thread-pool call or by still filling
> some
> data for the poller. The same can be done with current design, the only
> difference I see is that in destructor it's impossible to wait for the
> operation to finish. To be waitable there should be a method, but
> destructor
> won't go away, it will have to deactivate the descriptor in case that
> method
> was somewhy not called.


We don't need to wait for cancellation, since the caller already waits
for the original request. It's a way to say "make this request complete
quickly, even if it means that it completes with an error". For the
caller, the interesting point is when the original request completes
(successfully or not), since that's when they can recover resources
allocated for it (like memory buffers).
That even more primary than what I wrote.


> Anyway, I can include this in v2 and check how to make the
> _queue/_pending_io
> a bit better.




>
>> I guess we can keep the request in the queue, but zero the ticket and
>> func. This avoids shuffling around data.
>
> Would it be ... blameworthy to replace the func with a dummy one durng
> cancellation?


That is a neat idea. In general I hate if statements.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 7, 2020, 1:34:48 PM12/7/20
to Avi Kivity, seastar-dev@googlegroups.com
Active "mode" maybe? Anyway...

>>
>>> With the active approach we need do nothing, it's the canceled request that pays the price.
>>
>> But it pays one by either doing a thread-pool call or by still filling some
>> data for the poller. The same can be done with current design, the only
>> difference I see is that in destructor it's impossible to wait for the
>> operation to finish. To be waitable there should be a method, but destructor
>> won't go away, it will have to deactivate the descriptor in case that method
>> was somewhy not called.
>
>
> We don't need to wait for cancellation, since the caller already waits for the original request. It's a way to say "make this request complete quickly, even if it means that it completes with an error". For the caller, the interesting point is when the original request completes (successfully or not), since that's when they can recover resources allocated for it (like memory buffers).

Who is the cancellation trigger then? If it's not the intent holder, then how
will it find out a) the intent itself and b) when to stop cutting the IO?

Avi Kivity

<avi@scylladb.com>
unread,
Dec 8, 2020, 9:03:35 AM12/8/20
to Pavel Emelyanov, seastar-dev@googlegroups.com, Piotr Sarna
I think we're miscommunicating. I'll restate my view:


Active design:

 - io_intent and requests track each other (e.g. via anchorless_list,
or allocating a stable ID as in current patches)

     - if io_intent is moved, the requests know where it moved

     - if requests are moved or new requests are created, the io_intent
knows about them

 - io_intent destruction causes something to happen to requests

     - if a request is in a queue, it is removed, and completes (with
failure)

     - if it's in the kernel, it is cancelled (not really needed)


Passive design:

 - requests track io_intent, but not vice versa

     - if io_intent is moved, requests know where to find it

 - io_intent destruction just raises a flag

      - requests poll for the flag at strategic points, and complete
(with failure)

      - can incur latency between io_intent destruction and completion

      - cannot cancel in kernel (but not really needed)


The trigger does not have to be destruction of io_intent, it can be a
method call, from the implementation point of view it is the same. But
perhaps a method is easier. The caller holds both the future returned by
the I/O call, and an intent handle. If it decides it doesn't want the
I/O any more, it pulls the handle and the requests complete quickly with
a failure (or success, if the handle was pulled too late).


example:


future<stuff> read_stuff(lowres_clock::time_point timeout) {

    io_intent handle;

    timer<> tmr([&] { handle.cancel(); });

    tmr.arm(timeout);

    char buffer[4096];

    auto data = co_await g_file.read(0, 4096, buffer, handle);

    co_return parse_stuff(data);

}


<scylla>

btw, I remembered something: we cache readers across cql requests, but
each cql page request has its own timeout. But I think we can have a
timeout for all pages, instead of for each page individually, when ALTER
SESSION is in.


Piotr, I think OLTP sessions should interpret timeout for the entire
query, and OLAP sessions should interpret timeout per page. It does not
make sense to extend the timeout just because the result was long.

</scylla>

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 8, 2020, 9:32:55 AM12/8/20
to Avi Kivity, seastar-dev@googlegroups.com, Piotr Sarna
Yes, we're in the same boat about this.

I'm a bit confused about your idea of using the IO cancellation to release the
memory. Is it

"cancelling the IO should be fast, so that the memory doesn't hang around for too long"

or

"we could teach the memory reclaiming code to shut some IO-s down to free memory"

?

Avi Kivity

<avi@scylladb.com>
unread,
Dec 8, 2020, 9:50:02 AM12/8/20
to Pavel Emelyanov, seastar-dev@googlegroups.com, Piotr Sarna
It's both. The memory limiter (reader_concurrency_semaphore in Scylla)
notices that memory limits are exceeded. It tracks all requests (scylla
level, not I/O). It tells requests to cancel. The requests pull the
cancellation handle, and their I/Os complete quickly, with an error.
Then the scylla requests let go of their memory as they get destroyed.
They can't let go of memory earlier because it is pinned by I/O.
Reply all
Reply to author
Forward
0 new messages