There are several fields on output_stream that control the behavior of
batch flushing. This set is only used by some sockets, files don't set
the batch_flushes option and thus all this memory is wasted. Next
patches will add more stuff to support batch-flushing, so this waste is
going to increase.
This patch moves most of the batch flushing control bits into a separate
object owned by unique_ptr. Since stream creation is noexcept, its
constructor cannot make the new object by itself and should get it from
the caller. Because of this the output_stream_options.batch_flushes can
be marked as deprecated right at once.
include/seastar/core/iostream-impl.hh | 45 ++++++++-------
include/seastar/core/iostream.hh | 79 ++++++++++++++++++++++-----
src/core/reactor.cc | 4 +-
src/net/stack.cc | 3 +-
4 files changed, 92 insertions(+), 39 deletions(-)
diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 1c2a9ef7..4894697f 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -79,8 +79,9 @@ output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
+ assert(_batch_flushes);
// flush in progress, wait for it to end before continuing
- return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
+ return _batch_flushes->in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
return _fd.put(std::move(p));
});
} else {
@@ -433,14 +434,14 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- if (_ex) {
+ if (_batch_flushes->ex) {
// flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_ex));
+ return make_exception_future<>(std::move(_batch_flushes->ex));
} else {
_flush = true;
- if (!_in_batch) {
+ if (!_batch_flushes->in_batch) {
add_to_flush_poller(*this);
- _in_batch = promise<>();
+ _batch_flushes->in_batch = promise<>();
}
}
}
@@ -453,8 +454,9 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
+ assert(_batch_flushes);
// flush in progress, wait for it to end before continuing
- return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
+ return _batch_flushes->in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
return _fd.put(std::move(buf));
});
} else {
@@ -463,28 +465,29 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
}
template <typename CharType>
-void
-output_stream<CharType>::poll_flush() noexcept {
- if (!_flush) {
+void output_stream<CharType>::batch_flush_context::poll() noexcept {
+ assert(stream != nullptr);
+
+ if (!stream->_flush) {
// flush was canceled, do nothing
- _flushing = false;
- _in_batch.value().set_value();
- _in_batch = std::nullopt;
+ stream->_flushing = false;
+ in_batch.value().set_value();
+ in_batch = std::nullopt;
return;
}
- _flush = false;
- _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
+ stream->_flush = false;
+ stream->_flushing = true; // make whoever wants to write into the fd to wait for flush to complete
// FIXME: future is discarded
- (void)do_flush().then_wrapped([this] (future<> f) {
+ (void)stream->do_flush().then_wrapped([this] (future<> f) {
try {
f.get();
} catch (...) {
- _ex = std::current_exception();
+ ex = std::current_exception();
}
// if flush() was called while flushing flush once more
- poll_flush();
+ poll();
});
}
@@ -492,15 +495,15 @@ template <typename CharType>
future<>
output_stream<CharType>::close() noexcept {
return flush().finally([this] {
- if (_in_batch) {
- return _in_batch.value().get_future();
+ if (_batch_flushes && _batch_flushes->in_batch) {
+ return _batch_flushes->in_batch.value().get_future();
} else {
return make_ready_future();
}
}).then([this] {
// report final exception as close error
- if (_ex) {
- std::rethrow_exception(_ex);
+ if (_batch_flushes && _batch_flushes->ex) {
+ std::rethrow_exception(_batch_flushes->ex);
}
}).finally([this] {
return _fd.close();
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 59de8545..89103251 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -332,7 +332,13 @@ class input_stream final {
struct output_stream_options {
bool trim_to_size = false; ///< Make sure that buffers put into sink haven't
///< grown larger than the configured size
- bool batch_flushes = false; ///< Try to merge flushes with each other
+ [[deprecated("use batch_flush_context")]]
+ bool batch_flushes; ///< Try to merge flushes with each other
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ constexpr output_stream_options() noexcept : batch_flushes(false) { }
+#pragma GCC diagnostic pop
};
/// Facilitates data buffering before it's handed over to data_sink.
@@ -348,6 +354,16 @@ struct output_stream_options {
/// resolved.
template <typename CharType>
class output_stream final {
+public:
+ struct batch_flush_context {
+ std::optional<promise<>> in_batch;
+ std::exception_ptr ex;
+ bi::slist_member_hook<> in_poller;
+ output_stream* stream = nullptr;
+ void poll() noexcept;
+ };
+
+private:
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
@@ -356,19 +372,15 @@ class output_stream final {
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size = false;
- bool _batch_flushes = false;
- std::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
- std::exception_ptr _ex;
- bi::slist_member_hook<> _in_poller;
+ std::unique_ptr<batch_flush_context> _batch_flushes;
private:
size_t available() const noexcept { return _end - _begin; }
size_t possibly_available() const noexcept { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
future<> put(temporary_buffer<CharType> buf) noexcept;
- void poll_flush() noexcept;
future<> do_flush() noexcept;
future<> zero_copy_put(net::packet p) noexcept;
future<> zero_copy_split_and_put(net::packet p) noexcept;
@@ -377,18 +389,56 @@ class output_stream final {
public:
using char_type = CharType;
output_stream() noexcept = default;
- output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
- : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes) {}
+ output_stream(data_sink fd, size_t size, output_stream_options opts = {}, std::unique_ptr<batch_flush_context> ctx = {}) noexcept
+ : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(std::move(ctx))
+ {
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
[[deprecated("use output_stream_options instead of booleans")]]
output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
- : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
+ : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
output_stream(data_sink fd) noexcept
: _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
- output_stream(output_stream&&) noexcept = default;
- output_stream& operator=(output_stream&&) noexcept = default;
+ output_stream(output_stream&& o) noexcept
+ : _fd(std::move(o._fd))
+ , _buf(std::move(o._buf))
+ , _zc_bufs(std::move(o._zc_bufs))
+ , _size(std::exchange(o._size, 0))
+ , _begin(std::exchange(o._begin, 0))
+ , _end(std::exchange(o._end, 0))
+ , _trim_to_size(o._trim_to_size)
+ , _flush(o._flush)
+ , _flushing(o._flushing)
+ , _batch_flushes(std::move(o._batch_flushes))
+ {
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
+ output_stream& operator=(output_stream&& o) noexcept {
+ if (this != &o) {
+ _fd = std::move(o._fd);
+ _buf = std::move(o._buf);
+ _zc_bufs = std::move(o._zc_bufs);
+ _size = std::exchange(o._size, 0);
+ _begin = std::exchange(o._begin, 0);
+ _end = std::exchange(o._end, 0);
+ _trim_to_size = o._trim_to_size;
+ _flush = o._flush;
+ _flushing = o._flushing;
+ _batch_flushes = std::move(o._batch_flushes);
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
+ return *this;
+ }
+
~output_stream() {
if (_batch_flushes) {
- assert(!_in_batch && "Was this stream properly closed?");
+ assert(!_batch_flushes->in_batch && "Was this stream properly closed?");
} else {
assert(!_end && !_zc_bufs && "Was this stream properly closed?");
}
@@ -423,9 +473,10 @@ class output_stream final {
/// \returns the data_sink
data_sink detach() &&;
- using batch_flush_list_t = bi::slist<output_stream,
+ friend void add_to_flush_poller(output_stream<char>& os) noexcept;
+ using batch_flush_list_t = bi::slist<batch_flush_context,
bi::constant_time_size<false>, bi::cache_last<true>,
- bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
+ bi::member_hook<batch_flush_context, bi::slist_member_hook<>, &batch_flush_context::in_poller>>;
private:
friend class reactor;
};
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 100eb80f..1fbe5c52 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2364,7 +2364,7 @@ reactor::flush_tcp_batches() {
while (!_flush_batching.empty()) {
auto& os = _flush_batching.front();
_flush_batching.pop_front();
- os.poll_flush();
+ os.poll();
}
return work;
}
@@ -4331,7 +4331,7 @@ future<> later() noexcept {
}
void add_to_flush_poller(output_stream<char>& os) noexcept {
- engine()._flush_batching.push_back(os);
+ engine()._flush_batching.push_back(*os._batch_flushes);
}
steady_clock_type::duration reactor::total_idle_time() {
diff --git a/src/net/stack.cc b/src/net/stack.cc
index 1e7bc88a..7a179795 100644
--- a/src/net/stack.cc
+++ b/src/net/stack.cc
@@ -102,9 +102,8 @@ input_stream<char> connected_socket::input(connected_socket_input_stream_config
output_stream<char> connected_socket::output(size_t buffer_size) {
output_stream_options opts;
- opts.batch_flushes = true;
// TODO: allow user to determine buffer size etc
- return output_stream<char>(_csi->sink(), buffer_size, opts);
+ return output_stream<char>(_csi->sink(), buffer_size, opts, std::make_unique<output_stream<char>::batch_flush_context>());
}
void connected_socket::set_nodelay(bool nodelay) {
--
2.20.1