[PATCH v2 0/4] Abort RPC connection on send/flush failures

11 views
Skip to first unread message

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:15 AMAug 25
to seastar-dev@googlegroups.com, Pavel Emelyanov
When sending a message via connected_socket's stream the write/flush
may fail. In this case the connection may get stuck because either
receive loop is not woken up, or because batch-flush hides this error
and doesn't report it up the chain.

This set fixes both and includes an error-injection test that makes
sure the connection doesn't get stuck if sending a message by either
side fails.

branch: https://github.com/xemul/seastar/tree/br-rpc-connection-glitches-2
tests: unit(dev)

Pavel Emelyanov (4):
rpc: Abort connection if send_entry() fails
output_stream: Handle batch-flush exception synchronously
output_stream: Indentation fix after previous patch
rpc_test: Test rpc send glitches

include/seastar/core/iostream-impl.hh | 24 +++--------
include/seastar/core/iostream.hh | 9 +++-
include/seastar/net/posix-stack.hh | 1 +
src/core/reactor.cc | 4 ++
src/net/native-stack-impl.hh | 3 ++
src/net/posix-stack.cc | 4 ++
src/net/tls.cc | 4 ++
src/rpc/rpc.cc | 6 ++-
tests/unit/loopback_socket.hh | 7 +++
tests/unit/rpc_test.cc | 61 +++++++++++++++++++++++++++
10 files changed, 102 insertions(+), 21 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:16 AMAug 25
to seastar-dev@googlegroups.com, Pavel Emelyanov
If we fail to send for any reason we mark the connection as failed.
We also need to wake up a receiver side as well for it to see that
connection should be closed.

fixes: #1146

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 75bab545..b15635af 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -300,8 +300,10 @@ namespace rpc {

p->uncancellable();
return send_entry(*p).then_wrapped([this, p = std::move(p)] (auto f) mutable {
- _error |= f.failed();
- f.ignore_ready_future();
+ if (f.failed()) {
+ f.ignore_ready_future();
+ abort();
+ }
p->done.set_value();
});
});
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:17 AMAug 25
to seastar-dev@googlegroups.com, Pavel Emelyanov
When output_strea::flush() is called with .batch_flushes on the flush()
itself resolves immediately with ready future. The flush itself happens
later when a dedicated poller gets to it. If that real flush resolves
with exception the poller just keeps the exception on the stream and
does nothing more.

The described behavior can dead-lock if the user operates in the

while (!aborted()) {
co_await ostream.send()
co_await ostream.flush()
co_await istream.recv()
co_await handle()
}

manner. Even if ostream real flush fails some time later the whole loop
is not aware of it as it's sitting in another stream.

The proposed fix is to make batch-flush poller let the stream handle
background flush exception itself. By default (e.g. files that don't use
batch flushing) it's a never called no-op. Streams created by sockets
need to abort the connection the way they prefer.

fixes: #1150

(indentation is left broken)

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 16 ++--------------
include/seastar/core/iostream.hh | 9 ++++++++-
include/seastar/net/posix-stack.hh | 1 +
src/core/reactor.cc | 4 ++++
src/net/native-stack-impl.hh | 3 +++
src/net/posix-stack.cc | 4 ++++
src/net/tls.cc | 4 ++++
tests/unit/loopback_socket.hh | 7 +++++++
8 files changed, 33 insertions(+), 15 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 8b9baa4e..b62ca891 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -426,16 +426,11 @@ output_stream<CharType>::flush() noexcept {
});
}
} else {
- if (_ex) {
- // flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_ex));
- } else {
_flush = true;
if (!_in_batch) {
add_to_flush_poller(*this);
_in_batch = promise<>();
}
- }
}
return make_ready_future<>();
}
@@ -483,10 +478,8 @@ output_stream<CharType>::poll_flush() noexcept {
(void)f.then([this] {
return _fd.flush();
}).then_wrapped([this] (future<> f) {
- try {
- f.get();
- } catch (...) {
- _ex = std::current_exception();
+ if (f.failed()) {
+ _fd.on_batch_flush_error(f.get_exception());
}
// if flush() was called while flushing flush once more
poll_flush();
@@ -502,11 +495,6 @@ output_stream<CharType>::close() noexcept {
} else {
return make_ready_future();
}
- }).then([this] {
- // report final exception as close error
- if (_ex) {
- std::rethrow_exception(_ex);
- }
}).finally([this] {
return _fd.close();
});
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 233b93fd..25e6845c 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -113,6 +113,10 @@ class data_sink_impl {
}
virtual future<> close() = 0;

+ // Emergency close. When batch flush fails "in the background" it
+ // only has the data_sink at hand to forward the failure to
+ virtual void abort_write(std::exception_ptr eptr) noexcept;
+
// The method should return the maximum buffer size that's acceptable by
// the sink. It's used when the output stream is constructed without any
// specific buffer size. In this case the stream accepts this value as its
@@ -169,6 +173,10 @@ class data_sink {
}
}

+ void on_batch_flush_error(std::exception_ptr eptr) noexcept {
+ _dsi->abort_write(std::move(eptr));
+ }
+
size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
};

@@ -360,7 +368,6 @@ class output_stream final {
std::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
- std::exception_ptr _ex;
bi::slist_member_hook<> _in_poller;

private:
diff --git a/include/seastar/net/posix-stack.hh b/include/seastar/net/posix-stack.hh
index e76d7aff..46044b43 100644
--- a/include/seastar/net/posix-stack.hh
+++ b/include/seastar/net/posix-stack.hh
@@ -128,6 +128,7 @@ class posix_data_sink_impl : public data_sink_impl {
future<> put(packet p) override;
future<> put(temporary_buffer<char> buf) override;
future<> close() override;
+ void abort_write(std::exception_ptr eptr) noexcept override;
};

class posix_ap_server_socket_impl : public server_socket_impl {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 100eb80f..2a42ceb1 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2452,6 +2452,10 @@ class reactor::signal_pollfn final : public reactor::pollfn {
}
};

+void data_sink_impl::abort_write(std::exception_ptr eptr) noexcept {
+ on_internal_error_noexcept(seastar_logger, format("batch-flush reports error for not applicable data_sink: {}", eptr));
+}
+
class reactor::batch_flush_pollfn final : public simple_pollfn<true> {
reactor& _r;
public:
diff --git a/src/net/native-stack-impl.hh b/src/net/native-stack-impl.hh
index e5972958..023c56a0 100644
--- a/src/net/native-stack-impl.hh
+++ b/src/net/native-stack-impl.hh
@@ -195,6 +195,9 @@ class native_connected_socket_impl<Protocol>::native_data_sink_impl final
_conn->close_write();
return make_ready_future<>();
}
+ virtual void abort_write(std::exception_ptr) noexcept override {
+ _conn->close_write();
+ }
};

template <typename Protocol>
diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc
index 7a55b9fb..90bac6da 100644
--- a/src/net/posix-stack.cc
+++ b/src/net/posix-stack.cc
@@ -648,6 +648,10 @@ posix_data_sink_impl::close() {
return make_ready_future<>();
}

+void posix_data_sink_impl::abort_write(std::exception_ptr) noexcept {
+ shutdown_socket_fd(_fd, SHUT_WR);
+}
+
posix_network_stack::posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator)
: _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
}
diff --git a/src/net/tls.cc b/src/net/tls.cc
index 53da2cee..feef9339 100644
--- a/src/net/tls.cc
+++ b/src/net/tls.cc
@@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl: public data_sink_impl, public sessio
_session->close();
return make_ready_future<>();
}
+
+ virtual void abort_write(std::exception_ptr) noexcept override {
+ _session->close();
+ }
};

class server_session : public net::server_socket_impl {
diff --git a/tests/unit/loopback_socket.hh b/tests/unit/loopback_socket.hh
index 0e164e77..e4e3f1b7 100644
--- a/tests/unit/loopback_socket.hh
+++ b/tests/unit/loopback_socket.hh
@@ -118,6 +118,13 @@ class loopback_data_sink_impl : public data_sink_impl {
});
});
}
+
+ void abort_write(std::exception_ptr eptr) noexcept override {
+ // Mimic the shutdown(SHUT_WR)
+ (void)smp::submit_to(_buffer.get_owner_shard(), [this] {
+ _buffer->shutdown();
+ });
+ }
};

class loopback_data_source_impl : public data_source_impl {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:18 AMAug 25
to seastar-dev@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index b62ca891..b9a18f7e 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -426,11 +426,11 @@ output_stream<CharType>::flush() noexcept {
});
}
} else {
- _flush = true;
- if (!_in_batch) {
- add_to_flush_poller(*this);
- _in_batch = promise<>();
- }
+ _flush = true;
+ if (!_in_batch) {
+ add_to_flush_poller(*this);
+ _in_batch = promise<>();
+ }
}
return make_ready_future<>();
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:19 AMAug 25
to seastar-dev@googlegroups.com, Pavel Emelyanov
This test runs a simple client->server loop of 4 synchronous calls and
tries to inject one-shot glitches at increasing limits. Once all stages
pass the test pass. The current expectation is that the connection won't
hang

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/rpc_test.cc | 61 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 61 insertions(+)

diff --git a/tests/unit/rpc_test.cc b/tests/unit/rpc_test.cc
index 617ddcf2..058de2ea 100644
--- a/tests/unit/rpc_test.cc
+++ b/tests/unit/rpc_test.cc
@@ -36,6 +36,7 @@
#include <seastar/util/log.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/noncopyable_function.hh>
+#include <seastar/util/later.hh>

using namespace seastar;

@@ -663,6 +664,66 @@ SEASTAR_TEST_CASE(test_stream_negotiation_error) {
});
}

+static future<> test_rpc_connection_send_glitch(bool on_client) {
+ struct context {
+ int limit;
+ bool no_failures;
+ bool on_client;
+ };
+
+ return do_with(context{0, false, on_client}, [] (auto& ctx) {
+ return do_until([&ctx] {
+ return ctx.no_failures;
+ }, [&ctx] {
+ fmt::print("======== 8< ========\n");
+ fmt::print(" Checking {} limit\n", ctx.limit);
+ rpc_test_config cfg;
+ rpc_loopback_error_injector::config ecfg;
+ if (ctx.on_client) {
+ ecfg.client_snd.limit = ctx.limit;
+ ecfg.client_snd.kind = loopback_error_injector::error::one_shot;
+ } else {
+ ecfg.server_snd.limit = ctx.limit;
+ ecfg.server_snd.kind = loopback_error_injector::error::one_shot;
+ }
+ cfg.inject_error = ecfg;
+ return rpc_test_env<>::do_with_thread(cfg, [&ctx] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
+ env.register_handler(1, [] {
+ fmt::print(" reply\n");
+ return seastar::sleep(std::chrono::milliseconds(100)).then([] {
+ return make_ready_future<unsigned>(13);
+ });
+ }).get();
+
+ ctx.no_failures = true;
+
+ for (int i = 0; i < 4; i++) {
+ auto call = env.proto().make_client<unsigned ()>(1);
+ fmt::print(" call {}\n", i);
+ try {
+ auto id = call(c1).get0();
+ fmt::print(" response: {}\n", id);
+ } catch (...) {
+ fmt::print(" responce: ex {}\n", std::current_exception());
+ ctx.no_failures = false;
+ ctx.limit++;
+ break;
+ }
+ seastar::yield().get();
+ }
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_rpc_client_send_glitch) {
+ return test_rpc_connection_send_glitch(true);
+}
+
+SEASTAR_TEST_CASE(test_rpc_server_send_glitch) {
+ return test_rpc_connection_send_glitch(false);
+}
+
SEASTAR_TEST_CASE(test_rpc_scheduling) {
return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
auto sg = create_scheduling_group("rpc", 100).get0();
--
2.20.1

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 31, 2022, 9:49:12 AMAug 31
to Gleb Natapov, Pavel Emelyanov, seastar-dev@googlegroups.com
Gleb, can you please review?

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 4:24:15 AMSep 1
to Pavel Emelyanov, seastar-dev@googlegroups.com
So does it mean that if a stream user wants to know the reason for an
error it has to implement its own data source?
How does it solve your example above though? You shutdown a write side
but does it wake up the receiver?
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20220825123636.12826-3-xemul%40scylladb.com.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 1, 2022, 4:31:38 AMSep 1
to Gleb Natapov, seastar-dev@googlegroups.com
Source? It's sink method. Even though -- I don't get the question, why would
user need to implement its own sink? The eptr is here, I just don't see good
ways to push it anyhow to the user. Returning it back on .close() is not a
good idea from my POV.
It closes the write side, peer notices it (yes, after some time later) and
closes its connection, receiver (yes again, after some more time) notices
this and wakes up. There's no receiver ends here to close.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 4:39:16 AMSep 1
to Pavel Emelyanov, seastar-dev@googlegroups.com
Yes, sink, not source of course. eptr is here, but the only way for the
user code to get it is to override abort_write in its own
implementation. We do not even log this exception anywhere now.
So we rely on the peer side to close its sending end to abort our
receiver. But it is not guarantied that peer will do that, no? For our
RPC implementation we know it will, but this code may communicate with non
RPC or non seastar application as well.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 1, 2022, 4:55:32 AMSep 1
to Gleb Natapov, seastar-dev@googlegroups.com
True. The eptr is effectively ignored. However ... (see below)
AFAIK for the established TCP socket shutdown(SHUT_WR) only differs from close()
in a way that descriptor remains alive. Other than that protocol behaves the same
-- sends FIN and enters FIN_WAIT1 state.

> For our
> RPC implementation we know it will, but this code may communicate with non
> RPC or non seastar application as well.

We rely here that either peer TCP socket will handle FIN "correctly" and close
its sending or local TCP will time-out FIN_WAIT1 eventually. In any case -- the
local connection would see the "connection closed by peer" error, not the original
eptr from above. Maybe logging it is indeed a good idea.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 5:29:33 AMSep 1
to Pavel Emelyanov, seastar-dev@googlegroups.com
IIRC half closet TCP connection is a thing (need to dust up my Stevens
books, but here is a link
https://superuser.com/questions/298919/what-is-tcp-half-open-connection-and-tcp-half-closed-connection).
Shutdown of one side only means that this side has nothing more to send,
but other side may continue sending.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 1, 2022, 6:05:28 AMSep 1
to Gleb Natapov, seastar-dev@googlegroups.com
If local socket failed flushing, it means that the peer was supposed to read
it. Now the peer's read channel is closed, so it will bail out. Not that it
guarantees that it will close the connection, of course. Is that your concern?

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 6:16:06 AMSep 1
to Pavel Emelyanov, seastar-dev@googlegroups.com
Yes. The peer may not know that the sender failed to send something. It
has no indication. Then it gets FIN and knows that the sender has
nothing to send any more, but it does not mean peer has nothing to send
and wants to close its side of the connection. In our RPC that of course
cannot happen since an error on the sender or the receiver side aborts
both ends.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:25 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
When sending a message via connected_socket's stream the write/flush
may fail. In this case the connection may get stuck because either
receive loop is not woken up, or because batch-flush hides this error
and doesn't report it up the chain.

This set fixes both and includes an error-injection test that makes
sure the connection doesn't get stuck if sending a message by either
side fails.

branch: https://github.com/xemul/seastar/tree/br-rpc-connection-glitches-3
tests: unit(dev)

v3:
- Make batch-flush call user-provided callback on background flush error
- Turn off batch-flushes for those who don't provide this callback
- Fix tests that mis-use output_stream .flush/.close API

Pavel Emelyanov (9):
output_stream: Detach batch-flush context into a separate object
tests: Dont shutdown websocket before closing
tests: Close socket output stream
tests: Ignore httpd client errors
output_stream: Introduce on-batch-flush-error callback
output_stream: Indentation fix after previous patch
rpc: Add batch-flush error callback (and turn b.f. back ON)
rpc: Abort connection if send_entry() fails
rpc_test: Test rpc send glitches

include/seastar/core/iostream-impl.hh | 55 ++++++++----------
include/seastar/core/iostream.hh | 83 ++++++++++++++++++++++-----
include/seastar/net/api.hh | 8 +++
src/core/reactor.cc | 4 +-
src/net/stack.cc | 7 ++-
src/rpc/rpc.cc | 11 +++-
tests/unit/httpd_test.cc | 2 +
tests/unit/rpc_test.cc | 61 ++++++++++++++++++++
tests/unit/socket_test.cc | 4 +-
tests/unit/websocket_test.cc | 2 -
10 files changed, 182 insertions(+), 55 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:26 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
On close() websocket tries to send the closing message to the peer. If
the socket is already shutdown this resolves into an exception. However,
the close() calls shutdown() itself, so test can avoid doing it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/websocket_test.cc | 2 --
1 file changed, 2 deletions(-)

diff --git a/tests/unit/websocket_test.cc b/tests/unit/websocket_test.cc
index ae97c526..a9c91710 100644
--- a/tests/unit/websocket_test.cc
+++ b/tests/unit/websocket_test.cc
@@ -52,7 +52,6 @@ SEASTAR_TEST_CASE(test_websocket_handshake) {
websocket::connection conn(dummy, acceptor.get0().connection);
future<> serve = conn.process();
auto close = defer([&conn, &input, &output, &serve] () noexcept {
- conn.shutdown();
conn.close().get();
input.close().get();
output.close().get();
@@ -118,7 +117,6 @@ SEASTAR_TEST_CASE(test_websocket_handler_registration) {
future<> serve = conn.process();

auto close = defer([&conn, &input, &output, &serve] () noexcept {
- conn.shutdown();
conn.close().get();
input.close().get();
output.close().get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:26 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
There are several fields on output_stream that control the behavior of
batch flushing. This set is only used by some sockets, files don't set
the batch_flushes option and thus all this memory is wasted. Next
patches will add more stuff to support batch-flushing, so this waste is
going to increase.

This patch moves most of the batch flushing control bits into a separate
object owned by unique_ptr. Since stream creation is noexcept, its
constructor cannot make the new object by itself and should get it from
the caller. Because of this the output_stream_options.batch_flushes can
be marked as deprecated right at once.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 45 ++++++++-------
include/seastar/core/iostream.hh | 79 ++++++++++++++++++++++-----
src/core/reactor.cc | 4 +-
src/net/stack.cc | 3 +-
4 files changed, 92 insertions(+), 39 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 1c2a9ef7..4894697f 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -79,8 +79,9 @@ output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
+ assert(_batch_flushes);
// flush in progress, wait for it to end before continuing
- return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
+ return _batch_flushes->in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
return _fd.put(std::move(p));
});
} else {
@@ -433,14 +434,14 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- if (_ex) {
+ if (_batch_flushes->ex) {
// flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_ex));
+ return make_exception_future<>(std::move(_batch_flushes->ex));
} else {
_flush = true;
- if (!_in_batch) {
+ if (!_batch_flushes->in_batch) {
add_to_flush_poller(*this);
- _in_batch = promise<>();
+ _batch_flushes->in_batch = promise<>();
}
}
}
@@ -453,8 +454,9 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
+ assert(_batch_flushes);
// flush in progress, wait for it to end before continuing
- return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
+ return _batch_flushes->in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
return _fd.put(std::move(buf));
});
} else {
@@ -463,28 +465,29 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
}

template <typename CharType>
-void
-output_stream<CharType>::poll_flush() noexcept {
- if (!_flush) {
+void output_stream<CharType>::batch_flush_context::poll() noexcept {
+ assert(stream != nullptr);
+
+ if (!stream->_flush) {
// flush was canceled, do nothing
- _flushing = false;
- _in_batch.value().set_value();
- _in_batch = std::nullopt;
+ stream->_flushing = false;
+ in_batch.value().set_value();
+ in_batch = std::nullopt;
return;
}

- _flush = false;
- _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
+ stream->_flush = false;
+ stream->_flushing = true; // make whoever wants to write into the fd to wait for flush to complete

// FIXME: future is discarded
- (void)do_flush().then_wrapped([this] (future<> f) {
+ (void)stream->do_flush().then_wrapped([this] (future<> f) {
try {
f.get();
} catch (...) {
- _ex = std::current_exception();
+ ex = std::current_exception();
}
// if flush() was called while flushing flush once more
- poll_flush();
+ poll();
});
}

@@ -492,15 +495,15 @@ template <typename CharType>
future<>
output_stream<CharType>::close() noexcept {
return flush().finally([this] {
- if (_in_batch) {
- return _in_batch.value().get_future();
+ if (_batch_flushes && _batch_flushes->in_batch) {
+ return _batch_flushes->in_batch.value().get_future();
} else {
return make_ready_future();
}
}).then([this] {
// report final exception as close error
- if (_ex) {
- std::rethrow_exception(_ex);
+ if (_batch_flushes && _batch_flushes->ex) {
+ std::rethrow_exception(_batch_flushes->ex);
}
}).finally([this] {
return _fd.close();
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 59de8545..89103251 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -332,7 +332,13 @@ class input_stream final {
struct output_stream_options {
bool trim_to_size = false; ///< Make sure that buffers put into sink haven't
///< grown larger than the configured size
- bool batch_flushes = false; ///< Try to merge flushes with each other
+ [[deprecated("use batch_flush_context")]]
+ bool batch_flushes; ///< Try to merge flushes with each other
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ constexpr output_stream_options() noexcept : batch_flushes(false) { }
+#pragma GCC diagnostic pop
};

/// Facilitates data buffering before it's handed over to data_sink.
@@ -348,6 +354,16 @@ struct output_stream_options {
/// resolved.
template <typename CharType>
class output_stream final {
+public:
+ struct batch_flush_context {
+ std::optional<promise<>> in_batch;
+ std::exception_ptr ex;
+ bi::slist_member_hook<> in_poller;
+ output_stream* stream = nullptr;
+ void poll() noexcept;
+ };
+
+private:
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
@@ -356,19 +372,15 @@ class output_stream final {
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size = false;
- bool _batch_flushes = false;
- std::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
- std::exception_ptr _ex;
- bi::slist_member_hook<> _in_poller;
+ std::unique_ptr<batch_flush_context> _batch_flushes;

private:
size_t available() const noexcept { return _end - _begin; }
size_t possibly_available() const noexcept { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
future<> put(temporary_buffer<CharType> buf) noexcept;
- void poll_flush() noexcept;
future<> do_flush() noexcept;
future<> zero_copy_put(net::packet p) noexcept;
future<> zero_copy_split_and_put(net::packet p) noexcept;
@@ -377,18 +389,56 @@ class output_stream final {
public:
using char_type = CharType;
output_stream() noexcept = default;
- output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
- : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes) {}
+ output_stream(data_sink fd, size_t size, output_stream_options opts = {}, std::unique_ptr<batch_flush_context> ctx = {}) noexcept
+ : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(std::move(ctx))
+ {
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
[[deprecated("use output_stream_options instead of booleans")]]
output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
- : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
+ : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
output_stream(data_sink fd) noexcept
: _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
- output_stream(output_stream&&) noexcept = default;
- output_stream& operator=(output_stream&&) noexcept = default;
+ output_stream(output_stream&& o) noexcept
+ : _fd(std::move(o._fd))
+ , _buf(std::move(o._buf))
+ , _zc_bufs(std::move(o._zc_bufs))
+ , _size(std::exchange(o._size, 0))
+ , _begin(std::exchange(o._begin, 0))
+ , _end(std::exchange(o._end, 0))
+ , _trim_to_size(o._trim_to_size)
+ , _flush(o._flush)
+ , _flushing(o._flushing)
+ , _batch_flushes(std::move(o._batch_flushes))
+ {
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
+ output_stream& operator=(output_stream&& o) noexcept {
+ if (this != &o) {
+ _fd = std::move(o._fd);
+ _buf = std::move(o._buf);
+ _zc_bufs = std::move(o._zc_bufs);
+ _size = std::exchange(o._size, 0);
+ _begin = std::exchange(o._begin, 0);
+ _end = std::exchange(o._end, 0);
+ _trim_to_size = o._trim_to_size;
+ _flush = o._flush;
+ _flushing = o._flushing;
+ _batch_flushes = std::move(o._batch_flushes);
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
+ return *this;
+ }
+
~output_stream() {
if (_batch_flushes) {
- assert(!_in_batch && "Was this stream properly closed?");
+ assert(!_batch_flushes->in_batch && "Was this stream properly closed?");
} else {
assert(!_end && !_zc_bufs && "Was this stream properly closed?");
}
@@ -423,9 +473,10 @@ class output_stream final {
/// \returns the data_sink
data_sink detach() &&;

- using batch_flush_list_t = bi::slist<output_stream,
+ friend void add_to_flush_poller(output_stream<char>& os) noexcept;
+ using batch_flush_list_t = bi::slist<batch_flush_context,
bi::constant_time_size<false>, bi::cache_last<true>,
- bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
+ bi::member_hook<batch_flush_context, bi::slist_member_hook<>, &batch_flush_context::in_poller>>;
private:
friend class reactor;
};
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 100eb80f..1fbe5c52 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2364,7 +2364,7 @@ reactor::flush_tcp_batches() {
while (!_flush_batching.empty()) {
auto& os = _flush_batching.front();
_flush_batching.pop_front();
- os.poll_flush();
+ os.poll();
}
return work;
}
@@ -4331,7 +4331,7 @@ future<> later() noexcept {
}

void add_to_flush_poller(output_stream<char>& os) noexcept {
- engine()._flush_batching.push_back(os);
+ engine()._flush_batching.push_back(*os._batch_flushes);
}

steady_clock_type::duration reactor::total_idle_time() {
diff --git a/src/net/stack.cc b/src/net/stack.cc
index 1e7bc88a..7a179795 100644
--- a/src/net/stack.cc
+++ b/src/net/stack.cc
@@ -102,9 +102,8 @@ input_stream<char> connected_socket::input(connected_socket_input_stream_config

output_stream<char> connected_socket::output(size_t buffer_size) {
output_stream_options opts;
- opts.batch_flushes = true;
// TODO: allow user to determine buffer size etc
- return output_stream<char>(_csi->sink(), buffer_size, opts);
+ return output_stream<char>(_csi->sink(), buffer_size, opts, std::make_unique<output_stream<char>::batch_flush_context>());
}

void connected_socket::set_nodelay(bool nodelay) {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:28 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
When client stops it shouldn't leave its output stream not closed.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/socket_test.cc | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/tests/unit/socket_test.cc b/tests/unit/socket_test.cc
index 2f9ad2fb..10c1e05a 100644
--- a/tests/unit/socket_test.cc
+++ b/tests/unit/socket_test.cc
@@ -87,7 +87,9 @@ SEASTAR_TEST_CASE(socket_skip_test) {
abort_source as;
auto client = async([&as] {
connected_socket socket = connect(ipv4_addr("127.0.0.1", 1234)).get();
- socket.output().write("abc").get();
+ auto out = socket.output();
+ out.write("abc").get();
+ out.close().get();
socket.shutdown_output();
try {
sleep_abortable(std::chrono::seconds(10), as).get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:29 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
In the end of this helper server is stopped. If client resolves with
exception the server's stop() is not called and http_server's tasks gate
is destroyed unclosed().

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/httpd_test.cc | 2 ++
1 file changed, 2 insertions(+)

diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc
index 664ab59c..60f246be 100644
--- a/tests/unit/httpd_test.cc
+++ b/tests/unit/httpd_test.cc
@@ -952,6 +952,8 @@ future<> check_http_reply (std::vector<sstring>&& req_parts, std::vector<std::st

input.close().get();
output.close().get();
+ }).handle_exception([] (std::exception_ptr e) {
+ // ignore
});

server._routes.put(GET, "/test", handl);
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:29 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
When output_strea::flush() is called with batch flushes ON the flush()
itself resolves immediately with ready future. The flush itself happens
later when a dedicated poller gets to it. If that real flush resolves
with exception the poller just keeps the exception on the stream and
does nothing more.

The described behavior can dead-lock if the user operates in the

while (!aborted()) {
co_await ostream.send()
co_await ostream.flush()
co_await istream.recv()
co_await handle()
}

manner. Even if ostream real flush fails some time later the whole loop
is not aware of it as it's sitting in another stream.

This patch adds a callback to batch flush context that's called when
flush fails in the background. The callback is to be provided by the
stream creator, if not the batch-flushes are left OFF. This slightly
changes the API -- all current callers of connected_socket::output() now
get the synchronous flushing.

refs: #1150

(indentation is deliberately left broken)

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 16 ++--------------
include/seastar/core/iostream.hh | 6 +++++-
include/seastar/net/api.hh | 8 ++++++++
src/net/stack.cc | 8 +++++++-
4 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 4894697f..46de5e80 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -434,16 +434,11 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- if (_batch_flushes->ex) {
- // flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_batch_flushes->ex));
- } else {
_flush = true;
if (!_batch_flushes->in_batch) {
add_to_flush_poller(*this);
_batch_flushes->in_batch = promise<>();
}
- }
}
return make_ready_future<>();
}
@@ -481,10 +476,8 @@ void output_stream<CharType>::batch_flush_context::poll() noexcept {

// FIXME: future is discarded
(void)stream->do_flush().then_wrapped([this] (future<> f) {
- try {
- f.get();
- } catch (...) {
- ex = std::current_exception();
+ if (f.failed()) {
+ on_error(f.get_exception());
}
// if flush() was called while flushing flush once more
poll();
@@ -500,11 +493,6 @@ output_stream<CharType>::close() noexcept {
} else {
return make_ready_future();
}
- }).then([this] {
- // report final exception as close error
- if (_batch_flushes && _batch_flushes->ex) {
- std::rethrow_exception(_batch_flushes->ex);
- }
}).finally([this] {
return _fd.close();
});
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 89103251..7400ef78 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -40,6 +40,7 @@
#include <seastar/core/temporary_buffer.hh>
#include <seastar/core/scattered_message.hh>
#include <seastar/util/std-compat.hh>
+#include <seastar/util/noncopyable_function.hh>

namespace bi = boost::intrusive;

@@ -357,10 +358,13 @@ class output_stream final {
public:
struct batch_flush_context {
std::optional<promise<>> in_batch;
- std::exception_ptr ex;
+ noncopyable_function<void(std::exception_ptr) noexcept> on_error;
bi::slist_member_hook<> in_poller;
output_stream* stream = nullptr;
void poll() noexcept;
+ explicit batch_flush_context(noncopyable_function<void(std::exception_ptr) noexcept> fn) noexcept
+ : on_error(std::move(fn))
+ {}
};

private:
diff --git a/include/seastar/net/api.hh b/include/seastar/net/api.hh
index 666b58fc..75d9b0ce 100644
--- a/include/seastar/net/api.hh
+++ b/include/seastar/net/api.hh
@@ -194,6 +194,14 @@ class connected_socket {
/// Gets an object that sends data to the remote endpoint.
/// \param buffer_size how much data to buffer
output_stream<char> output(size_t buffer_size = 8192);
+
+ /// Gets the output stream.
+ ///
+ /// Gets an object that sends data to the remote endpoint.
+ /// \param on_batch_flush_error the batch-flush error callback
+ /// \param buffer_size how much data to buffer
+ output_stream<char> output(noncopyable_function<void(std::exception_ptr) noexcept> on_batch_flush_error, size_t buffer_size = 8192);
+
/// Sets the TCP_NODELAY option (disabling Nagle's algorithm)
void set_nodelay(bool nodelay);
/// Gets the TCP_NODELAY option (Nagle's algorithm)
diff --git a/src/net/stack.cc b/src/net/stack.cc
index 7a179795..3518de93 100644
--- a/src/net/stack.cc
+++ b/src/net/stack.cc
@@ -103,7 +103,13 @@ input_stream<char> connected_socket::input(connected_socket_input_stream_config
output_stream<char> connected_socket::output(size_t buffer_size) {
output_stream_options opts;
// TODO: allow user to determine buffer size etc
- return output_stream<char>(_csi->sink(), buffer_size, opts, std::make_unique<output_stream<char>::batch_flush_context>());
+ return output_stream<char>(_csi->sink(), buffer_size, opts);
+}
+
+output_stream<char> connected_socket::output(noncopyable_function<void(std::exception_ptr) noexcept> on_batch_flush_error, size_t buffer_size) {
+ output_stream_options opts;
+ auto flush = std::make_unique<output_stream<char>::batch_flush_context>(std::move(on_batch_flush_error));
+ return output_stream<char>(_csi->sink(), buffer_size, opts, std::move(flush));

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:30 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 46de5e80..1d4b778a 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -434,11 +434,11 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- _flush = true;
- if (!_batch_flushes->in_batch) {
- add_to_flush_poller(*this);
- _batch_flushes->in_batch = promise<>();
- }
+ _flush = true;
+ if (!_batch_flushes->in_batch) {
+ add_to_flush_poller(*this);
+ _batch_flushes->in_batch = promise<>();

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:31 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
When backgroubd flush fails the connection should be aborted (with a log
warning).

fixes: #1150

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 75bab545..736914eb 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -203,7 +203,10 @@ namespace rpc {
}
_fd = std::move(fd);
_read_buf =_fd.input();
- _write_buf = _fd.output();
+ _write_buf = _fd.output([this] (std::exception_ptr eptr) noexcept {
+ log_exception(*this, log_level::error, "batch flush error, aborting connection", std::move(eptr));
+ abort();
+ });
_connected = true;
}

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:32 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
If we fail to send for any reason we mark the connection as failed.
We also need to wake up a receiver side as well for it to see that
connection should be closed.

fixes: #1146

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 736914eb..4e492641 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -303,8 +303,10 @@ namespace rpc {

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:33 AMSep 6
to seastar-dev@googlegroups.com, Pavel Emelyanov
This test runs a simple client->server loop of 4 synchronous calls and
tries to inject one-shot glitches at increasing limits. Once all stages
pass the test pass. The current expectation is that the connection won't
hang

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 5:47:42 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com
On Tue, Sep 06, 2022 at 11:41:13AM +0300, 'Pavel Emelyanov' via seastar-dev wrote:
> When output_strea::flush() is called with batch flushes ON the flush()
> itself resolves immediately with ready future. The flush itself happens
> later when a dedicated poller gets to it. If that real flush resolves
> with exception the poller just keeps the exception on the stream and
> does nothing more.
>
> The described behavior can dead-lock if the user operates in the
>
> while (!aborted()) {
> co_await ostream.send()
> co_await ostream.flush()
> co_await istream.recv()
> co_await handle()
> }
>
> manner. Even if ostream real flush fails some time later the whole loop
> is not aware of it as it's sitting in another stream.
>
> This patch adds a callback to batch flush context that's called when
> flush fails in the background. The callback is to be provided by the
> stream creator, if not the batch-flushes are left OFF. This slightly
> changes the API -- all current callers of connected_socket::output() now
> get the synchronous flushing.
>
Would the breakage less intrusive if instead of the callback we will
introduce notification mechanism:

future<std::exception_ptr> output_stream::get_flush_error();

?
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20220906084117.30337-6-xemul%40scylladb.com.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 6:00:42 AMSep 6
to Gleb Natapov, seastar-dev@googlegroups.com


On 06.09.2022 12:47, Gleb Natapov wrote:
> On Tue, Sep 06, 2022 at 11:41:13AM +0300, 'Pavel Emelyanov' via seastar-dev wrote:
>> When output_strea::flush() is called with batch flushes ON the flush()
>> itself resolves immediately with ready future. The flush itself happens
>> later when a dedicated poller gets to it. If that real flush resolves
>> with exception the poller just keeps the exception on the stream and
>> does nothing more.
>>
>> The described behavior can dead-lock if the user operates in the
>>
>> while (!aborted()) {
>> co_await ostream.send()
>> co_await ostream.flush()
>> co_await istream.recv()
>> co_await handle()
>> }
>>
>> manner. Even if ostream real flush fails some time later the whole loop
>> is not aware of it as it's sitting in another stream.
>>
>> This patch adds a callback to batch flush context that's called when
>> flush fails in the background. The callback is to be provided by the
>> stream creator, if not the batch-flushes are left OFF. This slightly
>> changes the API -- all current callers of connected_socket::output() now
>> get the synchronous flushing.
>>
> Would the breakage less intrusive if instead of the callback we will
> introduce notification mechanism:
>
> future<std::exception_ptr> output_stream::get_flush_error();
>
> ?

It somewhat exists already -- if you call flush() for the 2nd time it will return
the exception it faced previously. But I don't see how it helps the mentioned loop,
some code still needs to call this get_flush_error(), what should it be?

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:05:39 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 06/09/2022 11.41, 'Pavel Emelyanov' via seastar-dev wrote:
> There are several fields on output_stream that control the behavior of
> batch flushing. This set is only used by some sockets, files don't set
> the batch_flushes option and thus all this memory is wasted. Next
> patches will add more stuff to support batch-flushing, so this waste is
> going to increase.
>
> This patch moves most of the batch flushing control bits into a separate
> object owned by unique_ptr. Since stream creation is noexcept, its
> constructor cannot make the new object by itself and should get it from
> the caller. Because of this the output_stream_options.batch_flushes can
> be marked as deprecated right at once.
> diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
> index 59de8545..89103251 100644
> --- a/include/seastar/core/iostream.hh
> +++ b/include/seastar/core/iostream.hh
> @@ -332,7 +332,13 @@ class input_stream final {
> struct output_stream_options {
> bool trim_to_size = false; ///< Make sure that buffers put into sink haven't
> ///< grown larger than the configured size
> - bool batch_flushes = false; ///< Try to merge flushes with each other
> + [[deprecated("use batch_flush_context")]]
> + bool batch_flushes; ///< Try to merge flushes with each other
> +
> +#pragma GCC diagnostic push
> +#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
> + constexpr output_stream_options() noexcept : batch_flushes(false) { }
> +#pragma GCC diagnostic pop


Can we keep the initializer and use pragmas to suppress any warnings on
the initializer instead?


Adding a constructor can make the autogenerated constructors not work.
Better to keep this a struct.


btw, the whole batching thing is meant to be internal to socket->stream
and not exposed to the user, it's an API mistake. Let's put
batch_flush_context in an internal namespace so people aren't tempted to
play with it.



Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 6:11:02 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com
After output_stream creation a user calls:

os.get_flush_error().then([this] (std::exception_ptr e) {
log(e);
abort();
});

This runs in the background and called when background error happens.

This is not much different from this patch, just a subscriber model
instead of callback. I do not insist, just different interface to
consider.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 6:12:16 AMSep 6
to Avi Kivity, seastar-dev@googlegroups.com
I'm not sure I get this idea. The problem is that many placec that declare output_stream_options
local variable emit the deprecation warning, e.g. like this

/home/xemul/src/seastar/src/net/stack.cc:104:27: warning: synthesized method «constexpr seastar::output_stream_options::output_stream_options()» first required here
104 | output_stream_options opts;

Even though they don't touch this bit in this line of code.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:14:57 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 06/09/2022 11.41, 'Pavel Emelyanov' via seastar-dev wrote:
> When output_strea::flush() is called with batch flushes ON the flush()
> itself resolves immediately with ready future. The flush itself happens
> later when a dedicated poller gets to it. If that real flush resolves
> with exception the poller just keeps the exception on the stream and
> does nothing more.
>
> The described behavior can dead-lock if the user operates in the
>
> while (!aborted()) {
> co_await ostream.send()
> co_await ostream.flush()
> co_await istream.recv()
> co_await handle()
> }
>
> manner. Even if ostream real flush fails some time later the whole loop
> is not aware of it as it's sitting in another stream.
>
> This patch adds a callback to batch flush context that's called when
> flush fails in the background. The callback is to be provided by the
> stream creator, if not the batch-flushes are left OFF. This slightly
> changes the API -- all current callers of connected_socket::output() now
> get the synchronous flushing.


The behavior of flush() matches sendmsg(2). sendmsg(2) can succeed but
the actual sending can fail later (for example:


user: sendmsg()

kernel: copies data

remote: resets connection

kernel: drops data, marks next read as EPIPE?)


So how does the code example above behave with sendmsg/recvmsg? Will the
error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?


If so, let's mimic it. I want to avoid expanding the API surface and
keep programming simple, it's already not easy. More ways to report
errors make it harder.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:25:34 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com
I assumed it was the initializer causing problems, but no.


> The problem is that many placec that declare output_stream_options
> local variable emit the deprecation warning, e.g. like this
>
> /home/xemul/src/seastar/src/net/stack.cc:104:27: warning: synthesized
> method «constexpr
> seastar::output_stream_options::output_stream_options()» first
> required here
>   104 |     output_stream_options opts;
>
> Even though they don't touch this bit in this line of code.
>

I see. IMO it's a bug that the synthetic constructor issues the warning,
it should be only on user access (what about the destructor, doesn't it
also touch the deprecated variable?). I'll file bugs.


Meanwhile I don't see a better solution, so let's keep yours.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:27:43 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 6:42:45 AMSep 6
to Avi Kivity, seastar-dev@googlegroups.com
Note, that this is because there's one API "endpoint" through which the user
does both -- read and write. In our case the problematic "endpoint" is the
output_stream which is neither supposed to be read from, nor it has any legal
links to anything that can be shut down for read. I say "legal" because
technically the stream has access to pollable_fd which, in turn, can be
shutdown for read, but that's weird.

> So how does the code example above behave with sendmsg/recvmsg? Will the error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?

Yes, that's exactly what happens -- when batch flush hits an error it calls
the on_error(), in case of RPC (and after this set it's only RPC that uses
the batch flush) it results in connection::abort() that shutdowns its socket
for reading.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 7:25:09 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com
We can share state between the data_source and data_sink.


But, what kind of failures do we want to handle it? Most will be tcp
failures (and so will communicate via the kernel naturally). What else,
bad_alloc?


>
>> So how does the code example above behave with sendmsg/recvmsg? Will
>> the error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?
>
> Yes, that's exactly what happens -- when batch flush hits an error it
> calls
> the on_error(), in case of RPC (and after this set it's only RPC that
> uses
> the batch flush)


That's a regression then. Normal tcp users want it too (like scylladb
port 9042).


> it results in connection::abort() that shutdowns its socket
> for reading.


With sendmsg/recvmsg there is no on_error(). So what happens there?

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 7:33:16 AMSep 6
to Avi Kivity, Pavel Emelyanov, seastar-dev@googlegroups.com
I do not think those have async errors. If there is an error after the
syscall returns if a socket is a datagram the message can be simply
dropped. If it is a stream the message is resent until success.
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/b3a72ad4-a6bf-49e2-3567-d5414e59b68f%40scylladb.com.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 7:41:01 AMSep 6
to Avi Kivity, seastar-dev@googlegroups.com
Yes

>>
>>> So how does the code example above behave with sendmsg/recvmsg? Will the error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?
>>
>> Yes, that's exactly what happens -- when batch flush hits an error it calls
>> the on_error(), in case of RPC (and after this set it's only RPC that uses
>> the batch flush)
>
>
> That's a regression then. Normal tcp users want it too (like scylladb port 9042).

Flush batching is transparent from the API perspective. It may only affect
performance in some sense. Is it known that batch flushes helps native transport?

>> it results in connection::abort() that shutdowns its socket
>> for reading.
>
>
> With sendmsg/recvmsg there is no on_error(). So what happens there?

It depends on the socket type. Datagram sockets just drop the message, stream
try to resend until protocol decides it doesn't work any longer and abort the
connection. There the sk_err thing on struct sock that's set in such cases and
then checked in key places.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 7:41:04 AMSep 6
to Gleb Natapov, Pavel Emelyanov, seastar-dev@googlegroups.com
Or until the connection is reset (and then the error is propagated to
recvmsg).



Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 7:43:29 AMSep 6
to Avi Kivity, Pavel Emelyanov, seastar-dev@googlegroups.com
Why do you think that each internal error causes connection reset?
Suppose a send path fails to allocate. Why would it cause connection
reset?

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 8:13:35 AMSep 6
to Gleb Natapov, Pavel Emelyanov, seastar-dev@googlegroups.com
I don't think so. That's why I wrote "or".


Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 8:16:56 AMSep 6
to Avi Kivity, Pavel Emelyanov, seastar-dev@googlegroups.com
Well, if this is a stream socket and send will fail multiple times
eventually we will get local error on both send and receive side I
guess.

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 8:18:15 AMSep 6
to Pavel Emelyanov, seastar-dev@googlegroups.com
We can elevate bad_alloc to connection failure. We can't really recover
from it.


>
>>>
>>>> So how does the code example above behave with sendmsg/recvmsg?
>>>> Will the error encountered by sendmsg(2) be propagated to
>>>> recvmsg(2)? EPIPE?
>>>
>>> Yes, that's exactly what happens -- when batch flush hits an error
>>> it calls
>>> the on_error(), in case of RPC (and after this set it's only RPC
>>> that uses
>>> the batch flush)
>>
>>
>> That's a regression then. Normal tcp users want it too (like scylladb
>> port 9042).
>
> Flush batching is transparent from the API perspective. It may only
> affect
> performance in some sense. Is it known that batch flushes helps native
> transport?


IIRC batch_flushes was motivated by 9042. Any protocol that multiplexes
transmission benefits from it - otherwise we'd need a sendmsg per
response. This holds for 9042, rpc, http with pipelining, and newer http
that has multiplexing.


>
>>> it results in connection::abort() that shutdowns its socket
>>> for reading.
>>
>>
>> With sendmsg/recvmsg there is no on_error(). So what happens there?
>
> It depends on the socket type. Datagram sockets just drop the message,
> stream
> try to resend until protocol decides it doesn't work any longer and
> abort the
> connection. There the sk_err thing on struct sock that's set in such
> cases and
> then checked in key places.


Then we can do the same thing. Either via pollable_fd, or via shared
userspace state accessible via data_source/data_sink.


Adding an error handling API just makes it harder for everyone to get it
right.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:00 AMSep 12
to seastar-dev@googlegroups.com, Pavel Emelyanov
When sending a message via connected_socket's stream the write/flush
may fail. In this case the connection may get stuck because either
receive loop is not woken up, or because batch-flush hides this error
and doesn't report it up the chain.

This set fixes both and includes an error-injection test that makes
sure the connection doesn't get stuck if sending a message by either
side fails.

branch: https://github.com/xemul/seastar/tree/br-rpc-connection-glitches-4
tests: unit(dev)

v4:
- Terminate connection on error by closing it in both directions
- Don't mess with deprecating batch-flush (yet)

v3:
- Make batch-flush call user-provided callback on background flush error
- Turn off batch-flushes for those who don't provide this callback
- Fix tests that mis-use output_stream .flush/.close API

Pavel Emelyanov (4):
rpc: Abort connection if send_entry() fails
output_stream: Handle batch-flush exception synchronously
iostream: Indentation fix after previous patch
rpc_test: Test rpc send glitches

include/seastar/core/iostream-impl.hh | 24 +++--------
include/seastar/core/iostream.hh | 6 ++-
include/seastar/net/posix-stack.hh | 1 +
include/seastar/net/tcp.hh | 6 +++
src/core/reactor.cc | 4 ++
src/net/native-stack-impl.hh | 3 ++
src/net/posix-stack.cc | 8 ++++
src/net/tls.cc | 4 ++
src/rpc/rpc.cc | 6 ++-
tests/unit/loopback_socket.hh | 16 +++++--
tests/unit/rpc_test.cc | 61 +++++++++++++++++++++++++++
11 files changed, 115 insertions(+), 24 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:01 AMSep 12
to seastar-dev@googlegroups.com, Pavel Emelyanov
If we fail to send for any reason we mark the connection as failed.
We also need to wake up a receiver side as well for it to see that
connection should be closed.

fixes: #1146

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 75bab545..b15635af 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -300,8 +300,10 @@ namespace rpc {

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:02 AMSep 12