[PATCH v2 0/4] Abort RPC connection on send/flush failures

28 views
Skip to first unread message

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:15 AM8/25/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When sending a message via connected_socket's stream the write/flush
may fail. In this case the connection may get stuck because either
receive loop is not woken up, or because batch-flush hides this error
and doesn't report it up the chain.

This set fixes both and includes an error-injection test that makes
sure the connection doesn't get stuck if sending a message by either
side fails.

branch: https://github.com/xemul/seastar/tree/br-rpc-connection-glitches-2
tests: unit(dev)

Pavel Emelyanov (4):
rpc: Abort connection if send_entry() fails
output_stream: Handle batch-flush exception synchronously
output_stream: Indentation fix after previous patch
rpc_test: Test rpc send glitches

include/seastar/core/iostream-impl.hh | 24 +++--------
include/seastar/core/iostream.hh | 9 +++-
include/seastar/net/posix-stack.hh | 1 +
src/core/reactor.cc | 4 ++
src/net/native-stack-impl.hh | 3 ++
src/net/posix-stack.cc | 4 ++
src/net/tls.cc | 4 ++
src/rpc/rpc.cc | 6 ++-
tests/unit/loopback_socket.hh | 7 +++
tests/unit/rpc_test.cc | 61 +++++++++++++++++++++++++++
10 files changed, 102 insertions(+), 21 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:16 AM8/25/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
If we fail to send for any reason we mark the connection as failed.
We also need to wake up a receiver side as well for it to see that
connection should be closed.

fixes: #1146

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 75bab545..b15635af 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -300,8 +300,10 @@ namespace rpc {

p->uncancellable();
return send_entry(*p).then_wrapped([this, p = std::move(p)] (auto f) mutable {
- _error |= f.failed();
- f.ignore_ready_future();
+ if (f.failed()) {
+ f.ignore_ready_future();
+ abort();
+ }
p->done.set_value();
});
});
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:17 AM8/25/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When output_strea::flush() is called with .batch_flushes on the flush()
itself resolves immediately with ready future. The flush itself happens
later when a dedicated poller gets to it. If that real flush resolves
with exception the poller just keeps the exception on the stream and
does nothing more.

The described behavior can dead-lock if the user operates in the

while (!aborted()) {
co_await ostream.send()
co_await ostream.flush()
co_await istream.recv()
co_await handle()
}

manner. Even if ostream real flush fails some time later the whole loop
is not aware of it as it's sitting in another stream.

The proposed fix is to make batch-flush poller let the stream handle
background flush exception itself. By default (e.g. files that don't use
batch flushing) it's a never called no-op. Streams created by sockets
need to abort the connection the way they prefer.

fixes: #1150

(indentation is left broken)

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 16 ++--------------
include/seastar/core/iostream.hh | 9 ++++++++-
include/seastar/net/posix-stack.hh | 1 +
src/core/reactor.cc | 4 ++++
src/net/native-stack-impl.hh | 3 +++
src/net/posix-stack.cc | 4 ++++
src/net/tls.cc | 4 ++++
tests/unit/loopback_socket.hh | 7 +++++++
8 files changed, 33 insertions(+), 15 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 8b9baa4e..b62ca891 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -426,16 +426,11 @@ output_stream<CharType>::flush() noexcept {
});
}
} else {
- if (_ex) {
- // flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_ex));
- } else {
_flush = true;
if (!_in_batch) {
add_to_flush_poller(*this);
_in_batch = promise<>();
}
- }
}
return make_ready_future<>();
}
@@ -483,10 +478,8 @@ output_stream<CharType>::poll_flush() noexcept {
(void)f.then([this] {
return _fd.flush();
}).then_wrapped([this] (future<> f) {
- try {
- f.get();
- } catch (...) {
- _ex = std::current_exception();
+ if (f.failed()) {
+ _fd.on_batch_flush_error(f.get_exception());
}
// if flush() was called while flushing flush once more
poll_flush();
@@ -502,11 +495,6 @@ output_stream<CharType>::close() noexcept {
} else {
return make_ready_future();
}
- }).then([this] {
- // report final exception as close error
- if (_ex) {
- std::rethrow_exception(_ex);
- }
}).finally([this] {
return _fd.close();
});
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 233b93fd..25e6845c 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -113,6 +113,10 @@ class data_sink_impl {
}
virtual future<> close() = 0;

+ // Emergency close. When batch flush fails "in the background" it
+ // only has the data_sink at hand to forward the failure to
+ virtual void abort_write(std::exception_ptr eptr) noexcept;
+
// The method should return the maximum buffer size that's acceptable by
// the sink. It's used when the output stream is constructed without any
// specific buffer size. In this case the stream accepts this value as its
@@ -169,6 +173,10 @@ class data_sink {
}
}

+ void on_batch_flush_error(std::exception_ptr eptr) noexcept {
+ _dsi->abort_write(std::move(eptr));
+ }
+
size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
};

@@ -360,7 +368,6 @@ class output_stream final {
std::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
- std::exception_ptr _ex;
bi::slist_member_hook<> _in_poller;

private:
diff --git a/include/seastar/net/posix-stack.hh b/include/seastar/net/posix-stack.hh
index e76d7aff..46044b43 100644
--- a/include/seastar/net/posix-stack.hh
+++ b/include/seastar/net/posix-stack.hh
@@ -128,6 +128,7 @@ class posix_data_sink_impl : public data_sink_impl {
future<> put(packet p) override;
future<> put(temporary_buffer<char> buf) override;
future<> close() override;
+ void abort_write(std::exception_ptr eptr) noexcept override;
};

class posix_ap_server_socket_impl : public server_socket_impl {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 100eb80f..2a42ceb1 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2452,6 +2452,10 @@ class reactor::signal_pollfn final : public reactor::pollfn {
}
};

+void data_sink_impl::abort_write(std::exception_ptr eptr) noexcept {
+ on_internal_error_noexcept(seastar_logger, format("batch-flush reports error for not applicable data_sink: {}", eptr));
+}
+
class reactor::batch_flush_pollfn final : public simple_pollfn<true> {
reactor& _r;
public:
diff --git a/src/net/native-stack-impl.hh b/src/net/native-stack-impl.hh
index e5972958..023c56a0 100644
--- a/src/net/native-stack-impl.hh
+++ b/src/net/native-stack-impl.hh
@@ -195,6 +195,9 @@ class native_connected_socket_impl<Protocol>::native_data_sink_impl final
_conn->close_write();
return make_ready_future<>();
}
+ virtual void abort_write(std::exception_ptr) noexcept override {
+ _conn->close_write();
+ }
};

template <typename Protocol>
diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc
index 7a55b9fb..90bac6da 100644
--- a/src/net/posix-stack.cc
+++ b/src/net/posix-stack.cc
@@ -648,6 +648,10 @@ posix_data_sink_impl::close() {
return make_ready_future<>();
}

+void posix_data_sink_impl::abort_write(std::exception_ptr) noexcept {
+ shutdown_socket_fd(_fd, SHUT_WR);
+}
+
posix_network_stack::posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator)
: _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
}
diff --git a/src/net/tls.cc b/src/net/tls.cc
index 53da2cee..feef9339 100644
--- a/src/net/tls.cc
+++ b/src/net/tls.cc
@@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl: public data_sink_impl, public sessio
_session->close();
return make_ready_future<>();
}
+
+ virtual void abort_write(std::exception_ptr) noexcept override {
+ _session->close();
+ }
};

class server_session : public net::server_socket_impl {
diff --git a/tests/unit/loopback_socket.hh b/tests/unit/loopback_socket.hh
index 0e164e77..e4e3f1b7 100644
--- a/tests/unit/loopback_socket.hh
+++ b/tests/unit/loopback_socket.hh
@@ -118,6 +118,13 @@ class loopback_data_sink_impl : public data_sink_impl {
});
});
}
+
+ void abort_write(std::exception_ptr eptr) noexcept override {
+ // Mimic the shutdown(SHUT_WR)
+ (void)smp::submit_to(_buffer.get_owner_shard(), [this] {
+ _buffer->shutdown();
+ });
+ }
};

class loopback_data_source_impl : public data_source_impl {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:18 AM8/25/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index b62ca891..b9a18f7e 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -426,11 +426,11 @@ output_stream<CharType>::flush() noexcept {
});
}
} else {
- _flush = true;
- if (!_in_batch) {
- add_to_flush_poller(*this);
- _in_batch = promise<>();
- }
+ _flush = true;
+ if (!_in_batch) {
+ add_to_flush_poller(*this);
+ _in_batch = promise<>();
+ }
}
return make_ready_future<>();
}
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Aug 25, 2022, 8:37:19 AM8/25/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
This test runs a simple client->server loop of 4 synchronous calls and
tries to inject one-shot glitches at increasing limits. Once all stages
pass the test pass. The current expectation is that the connection won't
hang

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/rpc_test.cc | 61 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 61 insertions(+)

diff --git a/tests/unit/rpc_test.cc b/tests/unit/rpc_test.cc
index 617ddcf2..058de2ea 100644
--- a/tests/unit/rpc_test.cc
+++ b/tests/unit/rpc_test.cc
@@ -36,6 +36,7 @@
#include <seastar/util/log.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/noncopyable_function.hh>
+#include <seastar/util/later.hh>

using namespace seastar;

@@ -663,6 +664,66 @@ SEASTAR_TEST_CASE(test_stream_negotiation_error) {
});
}

+static future<> test_rpc_connection_send_glitch(bool on_client) {
+ struct context {
+ int limit;
+ bool no_failures;
+ bool on_client;
+ };
+
+ return do_with(context{0, false, on_client}, [] (auto& ctx) {
+ return do_until([&ctx] {
+ return ctx.no_failures;
+ }, [&ctx] {
+ fmt::print("======== 8< ========\n");
+ fmt::print(" Checking {} limit\n", ctx.limit);
+ rpc_test_config cfg;
+ rpc_loopback_error_injector::config ecfg;
+ if (ctx.on_client) {
+ ecfg.client_snd.limit = ctx.limit;
+ ecfg.client_snd.kind = loopback_error_injector::error::one_shot;
+ } else {
+ ecfg.server_snd.limit = ctx.limit;
+ ecfg.server_snd.kind = loopback_error_injector::error::one_shot;
+ }
+ cfg.inject_error = ecfg;
+ return rpc_test_env<>::do_with_thread(cfg, [&ctx] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
+ env.register_handler(1, [] {
+ fmt::print(" reply\n");
+ return seastar::sleep(std::chrono::milliseconds(100)).then([] {
+ return make_ready_future<unsigned>(13);
+ });
+ }).get();
+
+ ctx.no_failures = true;
+
+ for (int i = 0; i < 4; i++) {
+ auto call = env.proto().make_client<unsigned ()>(1);
+ fmt::print(" call {}\n", i);
+ try {
+ auto id = call(c1).get0();
+ fmt::print(" response: {}\n", id);
+ } catch (...) {
+ fmt::print(" responce: ex {}\n", std::current_exception());
+ ctx.no_failures = false;
+ ctx.limit++;
+ break;
+ }
+ seastar::yield().get();
+ }
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_rpc_client_send_glitch) {
+ return test_rpc_connection_send_glitch(true);
+}
+
+SEASTAR_TEST_CASE(test_rpc_server_send_glitch) {
+ return test_rpc_connection_send_glitch(false);
+}
+
SEASTAR_TEST_CASE(test_rpc_scheduling) {
return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) {
auto sg = create_scheduling_group("rpc", 100).get0();
--
2.20.1

Benny Halevy

<bhalevy@scylladb.com>
unread,
Aug 31, 2022, 9:49:12 AM8/31/22
to Gleb Natapov, Pavel Emelyanov, seastar-dev@googlegroups.com
Gleb, can you please review?

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 4:24:15 AM9/1/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
So does it mean that if a stream user wants to know the reason for an
error it has to implement its own data source?
How does it solve your example above though? You shutdown a write side
but does it wake up the receiver?
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20220825123636.12826-3-xemul%40scylladb.com.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 1, 2022, 4:31:38 AM9/1/22
to Gleb Natapov, seastar-dev@googlegroups.com
Source? It's sink method. Even though -- I don't get the question, why would
user need to implement its own sink? The eptr is here, I just don't see good
ways to push it anyhow to the user. Returning it back on .close() is not a
good idea from my POV.
It closes the write side, peer notices it (yes, after some time later) and
closes its connection, receiver (yes again, after some more time) notices
this and wakes up. There's no receiver ends here to close.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 4:39:16 AM9/1/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
Yes, sink, not source of course. eptr is here, but the only way for the
user code to get it is to override abort_write in its own
implementation. We do not even log this exception anywhere now.
So we rely on the peer side to close its sending end to abort our
receiver. But it is not guarantied that peer will do that, no? For our
RPC implementation we know it will, but this code may communicate with non
RPC or non seastar application as well.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 1, 2022, 4:55:32 AM9/1/22
to Gleb Natapov, seastar-dev@googlegroups.com
True. The eptr is effectively ignored. However ... (see below)
AFAIK for the established TCP socket shutdown(SHUT_WR) only differs from close()
in a way that descriptor remains alive. Other than that protocol behaves the same
-- sends FIN and enters FIN_WAIT1 state.

> For our
> RPC implementation we know it will, but this code may communicate with non
> RPC or non seastar application as well.

We rely here that either peer TCP socket will handle FIN "correctly" and close
its sending or local TCP will time-out FIN_WAIT1 eventually. In any case -- the
local connection would see the "connection closed by peer" error, not the original
eptr from above. Maybe logging it is indeed a good idea.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 5:29:33 AM9/1/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
IIRC half closet TCP connection is a thing (need to dust up my Stevens
books, but here is a link
https://superuser.com/questions/298919/what-is-tcp-half-open-connection-and-tcp-half-closed-connection).
Shutdown of one side only means that this side has nothing more to send,
but other side may continue sending.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 1, 2022, 6:05:28 AM9/1/22
to Gleb Natapov, seastar-dev@googlegroups.com
If local socket failed flushing, it means that the peer was supposed to read
it. Now the peer's read channel is closed, so it will bail out. Not that it
guarantees that it will close the connection, of course. Is that your concern?

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 1, 2022, 6:16:06 AM9/1/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
Yes. The peer may not know that the sender failed to send something. It
has no indication. Then it gets FIN and knows that the sender has
nothing to send any more, but it does not mean peer has nothing to send
and wants to close its side of the connection. In our RPC that of course
cannot happen since an error on the sender or the receiver side aborts
both ends.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:25 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When sending a message via connected_socket's stream the write/flush
may fail. In this case the connection may get stuck because either
receive loop is not woken up, or because batch-flush hides this error
and doesn't report it up the chain.

This set fixes both and includes an error-injection test that makes
sure the connection doesn't get stuck if sending a message by either
side fails.

branch: https://github.com/xemul/seastar/tree/br-rpc-connection-glitches-3
tests: unit(dev)

v3:
- Make batch-flush call user-provided callback on background flush error
- Turn off batch-flushes for those who don't provide this callback
- Fix tests that mis-use output_stream .flush/.close API

Pavel Emelyanov (9):
output_stream: Detach batch-flush context into a separate object
tests: Dont shutdown websocket before closing
tests: Close socket output stream
tests: Ignore httpd client errors
output_stream: Introduce on-batch-flush-error callback
output_stream: Indentation fix after previous patch
rpc: Add batch-flush error callback (and turn b.f. back ON)
rpc: Abort connection if send_entry() fails
rpc_test: Test rpc send glitches

include/seastar/core/iostream-impl.hh | 55 ++++++++----------
include/seastar/core/iostream.hh | 83 ++++++++++++++++++++++-----
include/seastar/net/api.hh | 8 +++
src/core/reactor.cc | 4 +-
src/net/stack.cc | 7 ++-
src/rpc/rpc.cc | 11 +++-
tests/unit/httpd_test.cc | 2 +
tests/unit/rpc_test.cc | 61 ++++++++++++++++++++
tests/unit/socket_test.cc | 4 +-
tests/unit/websocket_test.cc | 2 -
10 files changed, 182 insertions(+), 55 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:26 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
On close() websocket tries to send the closing message to the peer. If
the socket is already shutdown this resolves into an exception. However,
the close() calls shutdown() itself, so test can avoid doing it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/websocket_test.cc | 2 --
1 file changed, 2 deletions(-)

diff --git a/tests/unit/websocket_test.cc b/tests/unit/websocket_test.cc
index ae97c526..a9c91710 100644
--- a/tests/unit/websocket_test.cc
+++ b/tests/unit/websocket_test.cc
@@ -52,7 +52,6 @@ SEASTAR_TEST_CASE(test_websocket_handshake) {
websocket::connection conn(dummy, acceptor.get0().connection);
future<> serve = conn.process();
auto close = defer([&conn, &input, &output, &serve] () noexcept {
- conn.shutdown();
conn.close().get();
input.close().get();
output.close().get();
@@ -118,7 +117,6 @@ SEASTAR_TEST_CASE(test_websocket_handler_registration) {
future<> serve = conn.process();

auto close = defer([&conn, &input, &output, &serve] () noexcept {
- conn.shutdown();
conn.close().get();
input.close().get();
output.close().get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:26 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
There are several fields on output_stream that control the behavior of
batch flushing. This set is only used by some sockets, files don't set
the batch_flushes option and thus all this memory is wasted. Next
patches will add more stuff to support batch-flushing, so this waste is
going to increase.

This patch moves most of the batch flushing control bits into a separate
object owned by unique_ptr. Since stream creation is noexcept, its
constructor cannot make the new object by itself and should get it from
the caller. Because of this the output_stream_options.batch_flushes can
be marked as deprecated right at once.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 45 ++++++++-------
include/seastar/core/iostream.hh | 79 ++++++++++++++++++++++-----
src/core/reactor.cc | 4 +-
src/net/stack.cc | 3 +-
4 files changed, 92 insertions(+), 39 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 1c2a9ef7..4894697f 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -79,8 +79,9 @@ output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
+ assert(_batch_flushes);
// flush in progress, wait for it to end before continuing
- return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
+ return _batch_flushes->in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
return _fd.put(std::move(p));
});
} else {
@@ -433,14 +434,14 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- if (_ex) {
+ if (_batch_flushes->ex) {
// flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_ex));
+ return make_exception_future<>(std::move(_batch_flushes->ex));
} else {
_flush = true;
- if (!_in_batch) {
+ if (!_batch_flushes->in_batch) {
add_to_flush_poller(*this);
- _in_batch = promise<>();
+ _batch_flushes->in_batch = promise<>();
}
}
}
@@ -453,8 +454,9 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
+ assert(_batch_flushes);
// flush in progress, wait for it to end before continuing
- return _in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
+ return _batch_flushes->in_batch.value().get_future().then([this, buf = std::move(buf)] () mutable {
return _fd.put(std::move(buf));
});
} else {
@@ -463,28 +465,29 @@ output_stream<CharType>::put(temporary_buffer<CharType> buf) noexcept {
}

template <typename CharType>
-void
-output_stream<CharType>::poll_flush() noexcept {
- if (!_flush) {
+void output_stream<CharType>::batch_flush_context::poll() noexcept {
+ assert(stream != nullptr);
+
+ if (!stream->_flush) {
// flush was canceled, do nothing
- _flushing = false;
- _in_batch.value().set_value();
- _in_batch = std::nullopt;
+ stream->_flushing = false;
+ in_batch.value().set_value();
+ in_batch = std::nullopt;
return;
}

- _flush = false;
- _flushing = true; // make whoever wants to write into the fd to wait for flush to complete
+ stream->_flush = false;
+ stream->_flushing = true; // make whoever wants to write into the fd to wait for flush to complete

// FIXME: future is discarded
- (void)do_flush().then_wrapped([this] (future<> f) {
+ (void)stream->do_flush().then_wrapped([this] (future<> f) {
try {
f.get();
} catch (...) {
- _ex = std::current_exception();
+ ex = std::current_exception();
}
// if flush() was called while flushing flush once more
- poll_flush();
+ poll();
});
}

@@ -492,15 +495,15 @@ template <typename CharType>
future<>
output_stream<CharType>::close() noexcept {
return flush().finally([this] {
- if (_in_batch) {
- return _in_batch.value().get_future();
+ if (_batch_flushes && _batch_flushes->in_batch) {
+ return _batch_flushes->in_batch.value().get_future();
} else {
return make_ready_future();
}
}).then([this] {
// report final exception as close error
- if (_ex) {
- std::rethrow_exception(_ex);
+ if (_batch_flushes && _batch_flushes->ex) {
+ std::rethrow_exception(_batch_flushes->ex);
}
}).finally([this] {
return _fd.close();
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 59de8545..89103251 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -332,7 +332,13 @@ class input_stream final {
struct output_stream_options {
bool trim_to_size = false; ///< Make sure that buffers put into sink haven't
///< grown larger than the configured size
- bool batch_flushes = false; ///< Try to merge flushes with each other
+ [[deprecated("use batch_flush_context")]]
+ bool batch_flushes; ///< Try to merge flushes with each other
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+ constexpr output_stream_options() noexcept : batch_flushes(false) { }
+#pragma GCC diagnostic pop
};

/// Facilitates data buffering before it's handed over to data_sink.
@@ -348,6 +354,16 @@ struct output_stream_options {
/// resolved.
template <typename CharType>
class output_stream final {
+public:
+ struct batch_flush_context {
+ std::optional<promise<>> in_batch;
+ std::exception_ptr ex;
+ bi::slist_member_hook<> in_poller;
+ output_stream* stream = nullptr;
+ void poll() noexcept;
+ };
+
+private:
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
@@ -356,19 +372,15 @@ class output_stream final {
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size = false;
- bool _batch_flushes = false;
- std::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
- std::exception_ptr _ex;
- bi::slist_member_hook<> _in_poller;
+ std::unique_ptr<batch_flush_context> _batch_flushes;

private:
size_t available() const noexcept { return _end - _begin; }
size_t possibly_available() const noexcept { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf) noexcept;
future<> put(temporary_buffer<CharType> buf) noexcept;
- void poll_flush() noexcept;
future<> do_flush() noexcept;
future<> zero_copy_put(net::packet p) noexcept;
future<> zero_copy_split_and_put(net::packet p) noexcept;
@@ -377,18 +389,56 @@ class output_stream final {
public:
using char_type = CharType;
output_stream() noexcept = default;
- output_stream(data_sink fd, size_t size, output_stream_options opts = {}) noexcept
- : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes) {}
+ output_stream(data_sink fd, size_t size, output_stream_options opts = {}, std::unique_ptr<batch_flush_context> ctx = {}) noexcept
+ : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(std::move(ctx))
+ {
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
[[deprecated("use output_stream_options instead of booleans")]]
output_stream(data_sink fd, size_t size, bool trim_to_size, bool batch_flushes = false) noexcept
- : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size), _batch_flushes(batch_flushes) {}
+ : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
output_stream(data_sink fd) noexcept
: _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
- output_stream(output_stream&&) noexcept = default;
- output_stream& operator=(output_stream&&) noexcept = default;
+ output_stream(output_stream&& o) noexcept
+ : _fd(std::move(o._fd))
+ , _buf(std::move(o._buf))
+ , _zc_bufs(std::move(o._zc_bufs))
+ , _size(std::exchange(o._size, 0))
+ , _begin(std::exchange(o._begin, 0))
+ , _end(std::exchange(o._end, 0))
+ , _trim_to_size(o._trim_to_size)
+ , _flush(o._flush)
+ , _flushing(o._flushing)
+ , _batch_flushes(std::move(o._batch_flushes))
+ {
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
+ output_stream& operator=(output_stream&& o) noexcept {
+ if (this != &o) {
+ _fd = std::move(o._fd);
+ _buf = std::move(o._buf);
+ _zc_bufs = std::move(o._zc_bufs);
+ _size = std::exchange(o._size, 0);
+ _begin = std::exchange(o._begin, 0);
+ _end = std::exchange(o._end, 0);
+ _trim_to_size = o._trim_to_size;
+ _flush = o._flush;
+ _flushing = o._flushing;
+ _batch_flushes = std::move(o._batch_flushes);
+ if (_batch_flushes) {
+ _batch_flushes->stream = this;
+ }
+ }
+ return *this;
+ }
+
~output_stream() {
if (_batch_flushes) {
- assert(!_in_batch && "Was this stream properly closed?");
+ assert(!_batch_flushes->in_batch && "Was this stream properly closed?");
} else {
assert(!_end && !_zc_bufs && "Was this stream properly closed?");
}
@@ -423,9 +473,10 @@ class output_stream final {
/// \returns the data_sink
data_sink detach() &&;

- using batch_flush_list_t = bi::slist<output_stream,
+ friend void add_to_flush_poller(output_stream<char>& os) noexcept;
+ using batch_flush_list_t = bi::slist<batch_flush_context,
bi::constant_time_size<false>, bi::cache_last<true>,
- bi::member_hook<output_stream, bi::slist_member_hook<>, &output_stream::_in_poller>>;
+ bi::member_hook<batch_flush_context, bi::slist_member_hook<>, &batch_flush_context::in_poller>>;
private:
friend class reactor;
};
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 100eb80f..1fbe5c52 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2364,7 +2364,7 @@ reactor::flush_tcp_batches() {
while (!_flush_batching.empty()) {
auto& os = _flush_batching.front();
_flush_batching.pop_front();
- os.poll_flush();
+ os.poll();
}
return work;
}
@@ -4331,7 +4331,7 @@ future<> later() noexcept {
}

void add_to_flush_poller(output_stream<char>& os) noexcept {
- engine()._flush_batching.push_back(os);
+ engine()._flush_batching.push_back(*os._batch_flushes);
}

steady_clock_type::duration reactor::total_idle_time() {
diff --git a/src/net/stack.cc b/src/net/stack.cc
index 1e7bc88a..7a179795 100644
--- a/src/net/stack.cc
+++ b/src/net/stack.cc
@@ -102,9 +102,8 @@ input_stream<char> connected_socket::input(connected_socket_input_stream_config

output_stream<char> connected_socket::output(size_t buffer_size) {
output_stream_options opts;
- opts.batch_flushes = true;
// TODO: allow user to determine buffer size etc
- return output_stream<char>(_csi->sink(), buffer_size, opts);
+ return output_stream<char>(_csi->sink(), buffer_size, opts, std::make_unique<output_stream<char>::batch_flush_context>());
}

void connected_socket::set_nodelay(bool nodelay) {
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:28 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When client stops it shouldn't leave its output stream not closed.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/socket_test.cc | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/tests/unit/socket_test.cc b/tests/unit/socket_test.cc
index 2f9ad2fb..10c1e05a 100644
--- a/tests/unit/socket_test.cc
+++ b/tests/unit/socket_test.cc
@@ -87,7 +87,9 @@ SEASTAR_TEST_CASE(socket_skip_test) {
abort_source as;
auto client = async([&as] {
connected_socket socket = connect(ipv4_addr("127.0.0.1", 1234)).get();
- socket.output().write("abc").get();
+ auto out = socket.output();
+ out.write("abc").get();
+ out.close().get();
socket.shutdown_output();
try {
sleep_abortable(std::chrono::seconds(10), as).get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:29 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
In the end of this helper server is stopped. If client resolves with
exception the server's stop() is not called and http_server's tasks gate
is destroyed unclosed().

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/unit/httpd_test.cc | 2 ++
1 file changed, 2 insertions(+)

diff --git a/tests/unit/httpd_test.cc b/tests/unit/httpd_test.cc
index 664ab59c..60f246be 100644
--- a/tests/unit/httpd_test.cc
+++ b/tests/unit/httpd_test.cc
@@ -952,6 +952,8 @@ future<> check_http_reply (std::vector<sstring>&& req_parts, std::vector<std::st

input.close().get();
output.close().get();
+ }).handle_exception([] (std::exception_ptr e) {
+ // ignore
});

server._routes.put(GET, "/test", handl);
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:29 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When output_strea::flush() is called with batch flushes ON the flush()
itself resolves immediately with ready future. The flush itself happens
later when a dedicated poller gets to it. If that real flush resolves
with exception the poller just keeps the exception on the stream and
does nothing more.

The described behavior can dead-lock if the user operates in the

while (!aborted()) {
co_await ostream.send()
co_await ostream.flush()
co_await istream.recv()
co_await handle()
}

manner. Even if ostream real flush fails some time later the whole loop
is not aware of it as it's sitting in another stream.

This patch adds a callback to batch flush context that's called when
flush fails in the background. The callback is to be provided by the
stream creator, if not the batch-flushes are left OFF. This slightly
changes the API -- all current callers of connected_socket::output() now
get the synchronous flushing.

refs: #1150

(indentation is deliberately left broken)

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 16 ++--------------
include/seastar/core/iostream.hh | 6 +++++-
include/seastar/net/api.hh | 8 ++++++++
src/net/stack.cc | 8 +++++++-
4 files changed, 22 insertions(+), 16 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 4894697f..46de5e80 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -434,16 +434,11 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- if (_batch_flushes->ex) {
- // flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_batch_flushes->ex));
- } else {
_flush = true;
if (!_batch_flushes->in_batch) {
add_to_flush_poller(*this);
_batch_flushes->in_batch = promise<>();
}
- }
}
return make_ready_future<>();
}
@@ -481,10 +476,8 @@ void output_stream<CharType>::batch_flush_context::poll() noexcept {

// FIXME: future is discarded
(void)stream->do_flush().then_wrapped([this] (future<> f) {
- try {
- f.get();
- } catch (...) {
- ex = std::current_exception();
+ if (f.failed()) {
+ on_error(f.get_exception());
}
// if flush() was called while flushing flush once more
poll();
@@ -500,11 +493,6 @@ output_stream<CharType>::close() noexcept {
} else {
return make_ready_future();
}
- }).then([this] {
- // report final exception as close error
- if (_batch_flushes && _batch_flushes->ex) {
- std::rethrow_exception(_batch_flushes->ex);
- }
}).finally([this] {
return _fd.close();
});
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 89103251..7400ef78 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -40,6 +40,7 @@
#include <seastar/core/temporary_buffer.hh>
#include <seastar/core/scattered_message.hh>
#include <seastar/util/std-compat.hh>
+#include <seastar/util/noncopyable_function.hh>

namespace bi = boost::intrusive;

@@ -357,10 +358,13 @@ class output_stream final {
public:
struct batch_flush_context {
std::optional<promise<>> in_batch;
- std::exception_ptr ex;
+ noncopyable_function<void(std::exception_ptr) noexcept> on_error;
bi::slist_member_hook<> in_poller;
output_stream* stream = nullptr;
void poll() noexcept;
+ explicit batch_flush_context(noncopyable_function<void(std::exception_ptr) noexcept> fn) noexcept
+ : on_error(std::move(fn))
+ {}
};

private:
diff --git a/include/seastar/net/api.hh b/include/seastar/net/api.hh
index 666b58fc..75d9b0ce 100644
--- a/include/seastar/net/api.hh
+++ b/include/seastar/net/api.hh
@@ -194,6 +194,14 @@ class connected_socket {
/// Gets an object that sends data to the remote endpoint.
/// \param buffer_size how much data to buffer
output_stream<char> output(size_t buffer_size = 8192);
+
+ /// Gets the output stream.
+ ///
+ /// Gets an object that sends data to the remote endpoint.
+ /// \param on_batch_flush_error the batch-flush error callback
+ /// \param buffer_size how much data to buffer
+ output_stream<char> output(noncopyable_function<void(std::exception_ptr) noexcept> on_batch_flush_error, size_t buffer_size = 8192);
+
/// Sets the TCP_NODELAY option (disabling Nagle's algorithm)
void set_nodelay(bool nodelay);
/// Gets the TCP_NODELAY option (Nagle's algorithm)
diff --git a/src/net/stack.cc b/src/net/stack.cc
index 7a179795..3518de93 100644
--- a/src/net/stack.cc
+++ b/src/net/stack.cc
@@ -103,7 +103,13 @@ input_stream<char> connected_socket::input(connected_socket_input_stream_config
output_stream<char> connected_socket::output(size_t buffer_size) {
output_stream_options opts;
// TODO: allow user to determine buffer size etc
- return output_stream<char>(_csi->sink(), buffer_size, opts, std::make_unique<output_stream<char>::batch_flush_context>());
+ return output_stream<char>(_csi->sink(), buffer_size, opts);
+}
+
+output_stream<char> connected_socket::output(noncopyable_function<void(std::exception_ptr) noexcept> on_batch_flush_error, size_t buffer_size) {
+ output_stream_options opts;
+ auto flush = std::make_unique<output_stream<char>::batch_flush_context>(std::move(on_batch_flush_error));
+ return output_stream<char>(_csi->sink(), buffer_size, opts, std::move(flush));

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:30 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 46de5e80..1d4b778a 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -434,11 +434,11 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- _flush = true;
- if (!_batch_flushes->in_batch) {
- add_to_flush_poller(*this);
- _batch_flushes->in_batch = promise<>();
- }
+ _flush = true;
+ if (!_batch_flushes->in_batch) {
+ add_to_flush_poller(*this);
+ _batch_flushes->in_batch = promise<>();

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:31 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When backgroubd flush fails the connection should be aborted (with a log
warning).

fixes: #1150

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 75bab545..736914eb 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -203,7 +203,10 @@ namespace rpc {
}
_fd = std::move(fd);
_read_buf =_fd.input();
- _write_buf = _fd.output();
+ _write_buf = _fd.output([this] (std::exception_ptr eptr) noexcept {
+ log_exception(*this, log_level::error, "batch flush error, aborting connection", std::move(eptr));
+ abort();
+ });
_connected = true;
}

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:32 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
If we fail to send for any reason we mark the connection as failed.
We also need to wake up a receiver side as well for it to see that
connection should be closed.

fixes: #1146

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 736914eb..4e492641 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -303,8 +303,10 @@ namespace rpc {

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 4:41:33 AM9/6/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
This test runs a simple client->server loop of 4 synchronous calls and
tries to inject one-shot glitches at increasing limits. Once all stages
pass the test pass. The current expectation is that the connection won't
hang

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 5:47:42 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
On Tue, Sep 06, 2022 at 11:41:13AM +0300, 'Pavel Emelyanov' via seastar-dev wrote:
> When output_strea::flush() is called with batch flushes ON the flush()
> itself resolves immediately with ready future. The flush itself happens
> later when a dedicated poller gets to it. If that real flush resolves
> with exception the poller just keeps the exception on the stream and
> does nothing more.
>
> The described behavior can dead-lock if the user operates in the
>
> while (!aborted()) {
> co_await ostream.send()
> co_await ostream.flush()
> co_await istream.recv()
> co_await handle()
> }
>
> manner. Even if ostream real flush fails some time later the whole loop
> is not aware of it as it's sitting in another stream.
>
> This patch adds a callback to batch flush context that's called when
> flush fails in the background. The callback is to be provided by the
> stream creator, if not the batch-flushes are left OFF. This slightly
> changes the API -- all current callers of connected_socket::output() now
> get the synchronous flushing.
>
Would the breakage less intrusive if instead of the callback we will
introduce notification mechanism:

future<std::exception_ptr> output_stream::get_flush_error();

?
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20220906084117.30337-6-xemul%40scylladb.com.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 6:00:42 AM9/6/22
to Gleb Natapov, seastar-dev@googlegroups.com


On 06.09.2022 12:47, Gleb Natapov wrote:
> On Tue, Sep 06, 2022 at 11:41:13AM +0300, 'Pavel Emelyanov' via seastar-dev wrote:
>> When output_strea::flush() is called with batch flushes ON the flush()
>> itself resolves immediately with ready future. The flush itself happens
>> later when a dedicated poller gets to it. If that real flush resolves
>> with exception the poller just keeps the exception on the stream and
>> does nothing more.
>>
>> The described behavior can dead-lock if the user operates in the
>>
>> while (!aborted()) {
>> co_await ostream.send()
>> co_await ostream.flush()
>> co_await istream.recv()
>> co_await handle()
>> }
>>
>> manner. Even if ostream real flush fails some time later the whole loop
>> is not aware of it as it's sitting in another stream.
>>
>> This patch adds a callback to batch flush context that's called when
>> flush fails in the background. The callback is to be provided by the
>> stream creator, if not the batch-flushes are left OFF. This slightly
>> changes the API -- all current callers of connected_socket::output() now
>> get the synchronous flushing.
>>
> Would the breakage less intrusive if instead of the callback we will
> introduce notification mechanism:
>
> future<std::exception_ptr> output_stream::get_flush_error();
>
> ?

It somewhat exists already -- if you call flush() for the 2nd time it will return
the exception it faced previously. But I don't see how it helps the mentioned loop,
some code still needs to call this get_flush_error(), what should it be?

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:05:39 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 06/09/2022 11.41, 'Pavel Emelyanov' via seastar-dev wrote:
> There are several fields on output_stream that control the behavior of
> batch flushing. This set is only used by some sockets, files don't set
> the batch_flushes option and thus all this memory is wasted. Next
> patches will add more stuff to support batch-flushing, so this waste is
> going to increase.
>
> This patch moves most of the batch flushing control bits into a separate
> object owned by unique_ptr. Since stream creation is noexcept, its
> constructor cannot make the new object by itself and should get it from
> the caller. Because of this the output_stream_options.batch_flushes can
> be marked as deprecated right at once.
> diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
> index 59de8545..89103251 100644
> --- a/include/seastar/core/iostream.hh
> +++ b/include/seastar/core/iostream.hh
> @@ -332,7 +332,13 @@ class input_stream final {
> struct output_stream_options {
> bool trim_to_size = false; ///< Make sure that buffers put into sink haven't
> ///< grown larger than the configured size
> - bool batch_flushes = false; ///< Try to merge flushes with each other
> + [[deprecated("use batch_flush_context")]]
> + bool batch_flushes; ///< Try to merge flushes with each other
> +
> +#pragma GCC diagnostic push
> +#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
> + constexpr output_stream_options() noexcept : batch_flushes(false) { }
> +#pragma GCC diagnostic pop


Can we keep the initializer and use pragmas to suppress any warnings on
the initializer instead?


Adding a constructor can make the autogenerated constructors not work.
Better to keep this a struct.


btw, the whole batching thing is meant to be internal to socket->stream
and not exposed to the user, it's an API mistake. Let's put
batch_flush_context in an internal namespace so people aren't tempted to
play with it.



Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 6:11:02 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
After output_stream creation a user calls:

os.get_flush_error().then([this] (std::exception_ptr e) {
log(e);
abort();
});

This runs in the background and called when background error happens.

This is not much different from this patch, just a subscriber model
instead of callback. I do not insist, just different interface to
consider.
--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 6:12:16 AM9/6/22
to Avi Kivity, seastar-dev@googlegroups.com
I'm not sure I get this idea. The problem is that many placec that declare output_stream_options
local variable emit the deprecation warning, e.g. like this

/home/xemul/src/seastar/src/net/stack.cc:104:27: warning: synthesized method «constexpr seastar::output_stream_options::output_stream_options()» first required here
104 | output_stream_options opts;

Even though they don't touch this bit in this line of code.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:14:57 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com

On 06/09/2022 11.41, 'Pavel Emelyanov' via seastar-dev wrote:
> When output_strea::flush() is called with batch flushes ON the flush()
> itself resolves immediately with ready future. The flush itself happens
> later when a dedicated poller gets to it. If that real flush resolves
> with exception the poller just keeps the exception on the stream and
> does nothing more.
>
> The described behavior can dead-lock if the user operates in the
>
> while (!aborted()) {
> co_await ostream.send()
> co_await ostream.flush()
> co_await istream.recv()
> co_await handle()
> }
>
> manner. Even if ostream real flush fails some time later the whole loop
> is not aware of it as it's sitting in another stream.
>
> This patch adds a callback to batch flush context that's called when
> flush fails in the background. The callback is to be provided by the
> stream creator, if not the batch-flushes are left OFF. This slightly
> changes the API -- all current callers of connected_socket::output() now
> get the synchronous flushing.


The behavior of flush() matches sendmsg(2). sendmsg(2) can succeed but
the actual sending can fail later (for example:


user: sendmsg()

kernel: copies data

remote: resets connection

kernel: drops data, marks next read as EPIPE?)


So how does the code example above behave with sendmsg/recvmsg? Will the
error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?


If so, let's mimic it. I want to avoid expanding the API surface and
keep programming simple, it's already not easy. More ways to report
errors make it harder.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:25:34 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
I assumed it was the initializer causing problems, but no.


> The problem is that many placec that declare output_stream_options
> local variable emit the deprecation warning, e.g. like this
>
> /home/xemul/src/seastar/src/net/stack.cc:104:27: warning: synthesized
> method «constexpr
> seastar::output_stream_options::output_stream_options()» first
> required here
>   104 |     output_stream_options opts;
>
> Even though they don't touch this bit in this line of code.
>

I see. IMO it's a bug that the synthetic constructor issues the warning,
it should be only on user access (what about the destructor, doesn't it
also touch the deprecated variable?). I'll file bugs.


Meanwhile I don't see a better solution, so let's keep yours.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 6:27:43 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 6:42:45 AM9/6/22
to Avi Kivity, seastar-dev@googlegroups.com
Note, that this is because there's one API "endpoint" through which the user
does both -- read and write. In our case the problematic "endpoint" is the
output_stream which is neither supposed to be read from, nor it has any legal
links to anything that can be shut down for read. I say "legal" because
technically the stream has access to pollable_fd which, in turn, can be
shutdown for read, but that's weird.

> So how does the code example above behave with sendmsg/recvmsg? Will the error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?

Yes, that's exactly what happens -- when batch flush hits an error it calls
the on_error(), in case of RPC (and after this set it's only RPC that uses
the batch flush) it results in connection::abort() that shutdowns its socket
for reading.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 7:25:09 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
We can share state between the data_source and data_sink.


But, what kind of failures do we want to handle it? Most will be tcp
failures (and so will communicate via the kernel naturally). What else,
bad_alloc?


>
>> So how does the code example above behave with sendmsg/recvmsg? Will
>> the error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?
>
> Yes, that's exactly what happens -- when batch flush hits an error it
> calls
> the on_error(), in case of RPC (and after this set it's only RPC that
> uses
> the batch flush)


That's a regression then. Normal tcp users want it too (like scylladb
port 9042).


> it results in connection::abort() that shutdowns its socket
> for reading.


With sendmsg/recvmsg there is no on_error(). So what happens there?

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 7:33:16 AM9/6/22
to Avi Kivity, Pavel Emelyanov, seastar-dev@googlegroups.com
I do not think those have async errors. If there is an error after the
syscall returns if a socket is a datagram the message can be simply
dropped. If it is a stream the message is resent until success.
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/b3a72ad4-a6bf-49e2-3567-d5414e59b68f%40scylladb.com.

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 6, 2022, 7:41:01 AM9/6/22
to Avi Kivity, seastar-dev@googlegroups.com
Yes

>>
>>> So how does the code example above behave with sendmsg/recvmsg? Will the error encountered by sendmsg(2) be propagated to recvmsg(2)? EPIPE?
>>
>> Yes, that's exactly what happens -- when batch flush hits an error it calls
>> the on_error(), in case of RPC (and after this set it's only RPC that uses
>> the batch flush)
>
>
> That's a regression then. Normal tcp users want it too (like scylladb port 9042).

Flush batching is transparent from the API perspective. It may only affect
performance in some sense. Is it known that batch flushes helps native transport?

>> it results in connection::abort() that shutdowns its socket
>> for reading.
>
>
> With sendmsg/recvmsg there is no on_error(). So what happens there?

It depends on the socket type. Datagram sockets just drop the message, stream
try to resend until protocol decides it doesn't work any longer and abort the
connection. There the sk_err thing on struct sock that's set in such cases and
then checked in key places.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 7:41:04 AM9/6/22
to Gleb Natapov, Pavel Emelyanov, seastar-dev@googlegroups.com
Or until the connection is reset (and then the error is propagated to
recvmsg).



Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 7:43:29 AM9/6/22
to Avi Kivity, Pavel Emelyanov, seastar-dev@googlegroups.com
Why do you think that each internal error causes connection reset?
Suppose a send path fails to allocate. Why would it cause connection
reset?

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 8:13:35 AM9/6/22
to Gleb Natapov, Pavel Emelyanov, seastar-dev@googlegroups.com
I don't think so. That's why I wrote "or".


Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 6, 2022, 8:16:56 AM9/6/22
to Avi Kivity, Pavel Emelyanov, seastar-dev@googlegroups.com
Well, if this is a stream socket and send will fail multiple times
eventually we will get local error on both send and receive side I
guess.

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 6, 2022, 8:18:15 AM9/6/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
We can elevate bad_alloc to connection failure. We can't really recover
from it.


>
>>>
>>>> So how does the code example above behave with sendmsg/recvmsg?
>>>> Will the error encountered by sendmsg(2) be propagated to
>>>> recvmsg(2)? EPIPE?
>>>
>>> Yes, that's exactly what happens -- when batch flush hits an error
>>> it calls
>>> the on_error(), in case of RPC (and after this set it's only RPC
>>> that uses
>>> the batch flush)
>>
>>
>> That's a regression then. Normal tcp users want it too (like scylladb
>> port 9042).
>
> Flush batching is transparent from the API perspective. It may only
> affect
> performance in some sense. Is it known that batch flushes helps native
> transport?


IIRC batch_flushes was motivated by 9042. Any protocol that multiplexes
transmission benefits from it - otherwise we'd need a sendmsg per
response. This holds for 9042, rpc, http with pipelining, and newer http
that has multiplexing.


>
>>> it results in connection::abort() that shutdowns its socket
>>> for reading.
>>
>>
>> With sendmsg/recvmsg there is no on_error(). So what happens there?
>
> It depends on the socket type. Datagram sockets just drop the message,
> stream
> try to resend until protocol decides it doesn't work any longer and
> abort the
> connection. There the sk_err thing on struct sock that's set in such
> cases and
> then checked in key places.


Then we can do the same thing. Either via pollable_fd, or via shared
userspace state accessible via data_source/data_sink.


Adding an error handling API just makes it harder for everyone to get it
right.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:00 AM9/12/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When sending a message via connected_socket's stream the write/flush
may fail. In this case the connection may get stuck because either
receive loop is not woken up, or because batch-flush hides this error
and doesn't report it up the chain.

This set fixes both and includes an error-injection test that makes
sure the connection doesn't get stuck if sending a message by either
side fails.

branch: https://github.com/xemul/seastar/tree/br-rpc-connection-glitches-4
tests: unit(dev)

v4:
- Terminate connection on error by closing it in both directions
- Don't mess with deprecating batch-flush (yet)

v3:
- Make batch-flush call user-provided callback on background flush error
- Turn off batch-flushes for those who don't provide this callback
- Fix tests that mis-use output_stream .flush/.close API

Pavel Emelyanov (4):
rpc: Abort connection if send_entry() fails
output_stream: Handle batch-flush exception synchronously
iostream: Indentation fix after previous patch
rpc_test: Test rpc send glitches

include/seastar/core/iostream-impl.hh | 24 +++--------
include/seastar/core/iostream.hh | 6 ++-
include/seastar/net/posix-stack.hh | 1 +
include/seastar/net/tcp.hh | 6 +++
src/core/reactor.cc | 4 ++
src/net/native-stack-impl.hh | 3 ++
src/net/posix-stack.cc | 8 ++++
src/net/tls.cc | 4 ++
src/rpc/rpc.cc | 6 ++-
tests/unit/loopback_socket.hh | 16 +++++--
tests/unit/rpc_test.cc | 61 +++++++++++++++++++++++++++
11 files changed, 115 insertions(+), 24 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:01 AM9/12/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
If we fail to send for any reason we mark the connection as failed.
We also need to wake up a receiver side as well for it to see that
connection should be closed.

fixes: #1146

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
src/rpc/rpc.cc | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 75bab545..b15635af 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -300,8 +300,10 @@ namespace rpc {

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:02 AM9/12/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
When output_strea::flush() is called with .batch_flushes ON the flush()
itself resolves immediately with ready future. The flush itself happens
later when a dedicated poller gets to it. If that real flush resolves
with an exception the poller just keeps the exception on the stream and
does nothing more.

The described behavior can dead-lock if the user operates in the

while (!aborted()) {
co_await ostream.send()
co_await ostream.flush()
co_await istream.recv()
co_await handle()
}

manner. Even if ostream real flush fails some time later the whole loop
is not aware of it as it's sitting in another stream.

The proposed fix is to make batch-flush poller let the stream handle
background flush exception itself. By default (e.g. files that don't use
batch flushing) it's a never called no-op. Streams created by sockets
need to terminate the connection the way they prefer.

fixes: #1150

(indentation is left broken)

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 16 ++--------------
include/seastar/core/iostream.hh | 6 +++++-
include/seastar/net/posix-stack.hh | 1 +
include/seastar/net/tcp.hh | 6 ++++++
src/core/reactor.cc | 4 ++++
src/net/native-stack-impl.hh | 3 +++
src/net/posix-stack.cc | 8 ++++++++
src/net/tls.cc | 4 ++++
tests/unit/loopback_socket.hh | 16 +++++++++++++---
9 files changed, 46 insertions(+), 18 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index 1c2a9ef7..b9c4b26f 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -433,16 +433,11 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- if (_ex) {
- // flush is a good time to deliver outstanding errors
- return make_exception_future<>(std::move(_ex));
- } else {
_flush = true;
if (!_in_batch) {
add_to_flush_poller(*this);
_in_batch = promise<>();
}
- }
}
return make_ready_future<>();
}
@@ -478,10 +473,8 @@ output_stream<CharType>::poll_flush() noexcept {

// FIXME: future is discarded
(void)do_flush().then_wrapped([this] (future<> f) {
- try {
- f.get();
- } catch (...) {
- _ex = std::current_exception();
+ if (f.failed()) {
+ _fd.on_batch_flush_error(f.get_exception());
}
// if flush() was called while flushing flush once more
poll_flush();
@@ -497,11 +490,6 @@ output_stream<CharType>::close() noexcept {
} else {
return make_ready_future();
}
- }).then([this] {
- // report final exception as close error
- if (_ex) {
- std::rethrow_exception(_ex);
- }
}).finally([this] {
return _fd.close();
});
diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
index 59de8545..bc06350b 100644
--- a/include/seastar/core/iostream.hh
+++ b/include/seastar/core/iostream.hh
@@ -113,6 +113,10 @@ class data_sink_impl {
}
virtual future<> close() = 0;

+ // Emergency close. When batch flush fails "in the background" it
+ // only has the data_sink at hand to forward the failure to
+ virtual void terminate(std::exception_ptr eptr) noexcept;
+
// The method should return the maximum buffer size that's acceptable by
// the sink. It's used when the output stream is constructed without any
// specific buffer size. In this case the stream accepts this value as its
@@ -169,6 +173,7 @@ class data_sink {
}
}

+ void on_batch_flush_error(std::exception_ptr eptr) noexcept { _dsi->terminate(std::move(eptr)); }
size_t buffer_size() const noexcept { return _dsi->buffer_size(); }
};

@@ -360,7 +365,6 @@ class output_stream final {
std::optional<promise<>> _in_batch;
bool _flush = false;
bool _flushing = false;
- std::exception_ptr _ex;
bi::slist_member_hook<> _in_poller;

private:
diff --git a/include/seastar/net/posix-stack.hh b/include/seastar/net/posix-stack.hh
index e76d7aff..b7b9072f 100644
--- a/include/seastar/net/posix-stack.hh
+++ b/include/seastar/net/posix-stack.hh
@@ -128,6 +128,7 @@ class posix_data_sink_impl : public data_sink_impl {
future<> put(packet p) override;
future<> put(temporary_buffer<char> buf) override;
future<> close() override;
+ void terminate(std::exception_ptr eptr) noexcept override;
};

class posix_ap_server_socket_impl : public server_socket_impl {
diff --git a/include/seastar/net/tcp.hh b/include/seastar/net/tcp.hh
index 7b45fc8b..f8d80e91 100644
--- a/include/seastar/net/tcp.hh
+++ b/include/seastar/net/tcp.hh
@@ -710,6 +710,7 @@ class tcp {
void shutdown_connect();
void close_read() noexcept;
void close_write() noexcept;
+ void terminate(std::exception_ptr) noexcept;
};
class listener {
tcp& _tcp;
@@ -2110,6 +2111,11 @@ void tcp<InetTraits>::connection::close_write() noexcept {
_tcb->close();
}

+template <typename InetTraits>
+void tcp<InetTraits>::connection::terminate(std::exception_ptr ex) noexcept {
+ _tcb->do_reset();
+}
+
template <typename InetTraits>
void tcp<InetTraits>::connection::shutdown_connect() {
if (_tcb->syn_needs_on()) {
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 3bf3a7f3..fff1b3d5 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -2452,6 +2452,10 @@ class reactor::signal_pollfn final : public reactor::pollfn {
}
};

+void data_sink_impl::terminate(std::exception_ptr eptr) noexcept {
+ on_internal_error_noexcept(seastar_logger, format("batch-flush aborts inapplicable data_sink with {}", eptr));
+}
+
class reactor::batch_flush_pollfn final : public simple_pollfn<true> {
reactor& _r;
public:
diff --git a/src/net/native-stack-impl.hh b/src/net/native-stack-impl.hh
index e5972958..022decc9 100644
--- a/src/net/native-stack-impl.hh
+++ b/src/net/native-stack-impl.hh
@@ -195,6 +195,9 @@ class native_connected_socket_impl<Protocol>::native_data_sink_impl final
_conn->close_write();
return make_ready_future<>();
}
+ virtual void terminate(std::exception_ptr ex) noexcept override {
+ _conn->terminate(std::move(ex));
+ }
};

template <typename Protocol>
diff --git a/src/net/posix-stack.cc b/src/net/posix-stack.cc
index 7a55b9fb..36820772 100644
--- a/src/net/posix-stack.cc
+++ b/src/net/posix-stack.cc
@@ -648,6 +648,14 @@ posix_data_sink_impl::close() {
return make_ready_future<>();
}

+void posix_data_sink_impl::terminate(std::exception_ptr) noexcept {
+ // This closes connection as if it was close()d, but leaves file
+ // descriptors around so that other fibers could still use it. The
+ // idea behind that is notifying any data_sources sharing this _fd
+ // wake up with "connection closed" and wrap up
+ _fd.shutdown(SHUT_RDWR);
+}
+
posix_network_stack::posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator)
: _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
}
diff --git a/src/net/tls.cc b/src/net/tls.cc
index ea4fbe4e..15a90e69 100644
--- a/src/net/tls.cc
+++ b/src/net/tls.cc
@@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl: public data_sink_impl, public sessio
_session->close();
return make_ready_future<>();
}
+
+ virtual void terminate(std::exception_ptr) noexcept override {
+ _session->close();
+ }
};

class server_session : public net::server_socket_impl {
diff --git a/tests/unit/loopback_socket.hh b/tests/unit/loopback_socket.hh
index 1f433f45..01d8a865 100644
--- a/tests/unit/loopback_socket.hh
+++ b/tests/unit/loopback_socket.hh
@@ -96,9 +96,12 @@ class loopback_buffer {

class loopback_data_sink_impl : public data_sink_impl {
lw_shared_ptr<foreign_ptr<lw_shared_ptr<loopback_buffer>>> _buffer;
+ lw_shared_ptr<loopback_buffer> _rx_buffer;
public:
- explicit loopback_data_sink_impl(lw_shared_ptr<foreign_ptr<lw_shared_ptr<loopback_buffer>>> buffer)
- : _buffer(buffer) {
+ explicit loopback_data_sink_impl(lw_shared_ptr<foreign_ptr<lw_shared_ptr<loopback_buffer>>> buffer, lw_shared_ptr<loopback_buffer> rx_buffer)
+ : _buffer(buffer)
+ , _rx_buffer(rx_buffer)
+ {
}
future<> put(net::packet data) override {
return do_with(data.release(), [this] (std::vector<temporary_buffer<char>>& bufs) {
@@ -118,6 +121,13 @@ class loopback_data_sink_impl : public data_sink_impl {
});
});
}
+
+ void terminate(std::exception_ptr eptr) noexcept override {
+ _rx_buffer->shutdown();
+ (void)smp::submit_to(_buffer->get_owner_shard(), [tx = _buffer] {
+ (*tx)->shutdown();
+ });
+ }
};

class loopback_data_source_impl : public data_source_impl {
@@ -160,7 +170,7 @@ class loopback_connected_socket_impl : public net::connected_socket_impl {
return data_source(std::make_unique<loopback_data_source_impl>(_rx));
}
data_sink sink() override {
- return data_sink(std::make_unique<loopback_data_sink_impl>(_tx));
+ return data_sink(std::make_unique<loopback_data_sink_impl>(_tx, _rx));
}
void shutdown_input() override {
_rx->shutdown();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:03 AM9/12/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
include/seastar/core/iostream-impl.hh | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/seastar/core/iostream-impl.hh b/include/seastar/core/iostream-impl.hh
index b9c4b26f..dca61ed2 100644
--- a/include/seastar/core/iostream-impl.hh
+++ b/include/seastar/core/iostream-impl.hh
@@ -433,11 +433,11 @@ output_stream<CharType>::flush() noexcept {
if (!_batch_flushes) {
return do_flush();
} else {
- _flush = true;
- if (!_in_batch) {
- add_to_flush_poller(*this);
- _in_batch = promise<>();
- }
+ _flush = true;
+ if (!_in_batch) {
+ add_to_flush_poller(*this);
+ _in_batch = promise<>();

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 12, 2022, 10:41:04 AM9/12/22
to seastar-dev@googlegroups.com, Pavel Emelyanov
This test runs a simple client->server loop of 4 synchronous calls and
tries to inject one-shot glitches at increasing limits. Once all stages
pass the test pass. The current expectation is that the connection won't
hang

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 13, 2022, 5:38:52 AM9/13/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
LGTM
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20220912144051.31416-1-xemul%40scylladb.com.

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 13, 2022, 8:01:59 AM9/13/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
When would _ex be delivered to the sending side? I don't see it in this
patch.


Can something that only ever write just keep on writing even though the
connection is bad?
Again how is an error on close reported?


> }).finally([this] {
> return _fd.close();
> });
> diff --git a/include/seastar/core/iostream.hh b/include/seastar/core/iostream.hh
> index 59de8545..bc06350b 100644
> --- a/include/seastar/core/iostream.hh
> +++ b/include/seastar/core/iostream.hh
> @@ -113,6 +113,10 @@ class data_sink_impl {
> }
> virtual future<> close() = 0;
>
> + // Emergency close. When batch flush fails "in the background" it
> + // only has the data_sink at hand to forward the failure to
> + virtual void terminate(std::exception_ptr eptr) noexcept;
> +


This is adding something to the public API, so we have to be careful,
batch flush is an implementation detail that we should try to keep from
leaking out even more that it leaked already.


Thinking about it, I believe terminate() is merited.
data_source/data_sink represent a half-duplex channel that might be part
of a full-duplex channel, and this gives us a link between the two
sub-channels.


> // The method should return the maximum buffer size that's acceptable by
> // the sink. It's used when the output stream is constructed without any
> // specific buffer size. In this case the stream accepts this value as its
> @@ -169,6 +173,7 @@ class data_sink {
> }
> }
>
> + void on_batch_flush_error(std::exception_ptr eptr) noexcept { _dsi->terminate(std::move(eptr)); }


This should be private + friends, so no one is tempted to use it and so
we can remove it later.
This won't send RST (see respond_with_reset()).


Better to call it reset() since that's the TCP-level name.


> +
> template <typename InetTraits>
> void tcp<InetTraits>::connection::shutdown_connect() {
> if (_tcb->syn_needs_on()) {
> diff --git a/src/core/reactor.cc b/src/core/reactor.cc
> index 3bf3a7f3..fff1b3d5 100644
> --- a/src/core/reactor.cc
> +++ b/src/core/reactor.cc
> @@ -2452,6 +2452,10 @@ class reactor::signal_pollfn final : public reactor::pollfn {
> }
> };
>
> +void data_sink_impl::terminate(std::exception_ptr eptr) noexcept {
> + on_internal_error_noexcept(seastar_logger, format("batch-flush aborts inapplicable data_sink with {}", eptr));


Do make the error more useful, also log typeid(*this).name().
So, the exception is lost. We should report the exception, it's the
beginning of the debugging trail.


> +}
> +
> posix_network_stack::posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator)
> : _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
> }
> diff --git a/src/net/tls.cc b/src/net/tls.cc
> index ea4fbe4e..15a90e69 100644
> --- a/src/net/tls.cc
> +++ b/src/net/tls.cc
> @@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl: public data_sink_impl, public sessio
> _session->close();
> return make_ready_future<>();
> }
> +
> + virtual void terminate(std::exception_ptr) noexcept override {
> + _session->close();
> + }


What happens if someone doesn't implement terminate() for a layered
socket? I guess you hit the on_internal_error(). Perhaps we should just
log it since it causes a fatal error for now-working code.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 13, 2022, 8:21:12 AM9/13/22
to Avi Kivity, seastar-dev@googlegroups.com
Unfortunately it's not delivered anywhere :(

> Can something that only ever write just keep on writing even though the connection is bad?

It can, but up until when? It shouldn't happen infinitely. Or do you mean that
someone calls output_stream.write()+.flush(), this thing fires, but the caller
ignores it and continues writing/flushing?
Agree. I planned to internal::-ize the batch-flush with the separate set.

> Thinking about it, I believe terminate() is merited. data_source/data_sink represent a half-duplex channel that might be part of a full-duplex channel, and this gives us a link between the two sub-channels.
>
>
>>       // The method should return the maximum buffer size that's acceptable by
>>       // the sink. It's used when the output stream is constructed without any
>>       // specific buffer size. In this case the stream accepts this value as its
>> @@ -169,6 +173,7 @@ class data_sink {
>>           }
>>       }
>> +    void on_batch_flush_error(std::exception_ptr eptr) noexcept { _dsi->terminate(std::move(eptr)); }
>
>
> This should be private + friends, so no one is tempted to use it and so we can remove it later.

We cannot remove it later unless the batch-flush is also removed, can we?
I didn't suppose it would, it needs memory for a packet that's not here

> Better to call it reset() since that's the TCP-level name.

Will do

>> +
>>   template <typename InetTraits>
>>   void tcp<InetTraits>::connection::shutdown_connect() {
>>       if (_tcb->syn_needs_on()) {
>> diff --git a/src/core/reactor.cc b/src/core/reactor.cc
>> index 3bf3a7f3..fff1b3d5 100644
>> --- a/src/core/reactor.cc
>> +++ b/src/core/reactor.cc
>> @@ -2452,6 +2452,10 @@ class reactor::signal_pollfn final : public reactor::pollfn {
>>       }
>>   };
>> +void data_sink_impl::terminate(std::exception_ptr eptr) noexcept {
>> +    on_internal_error_noexcept(seastar_logger, format("batch-flush aborts inapplicable data_sink with {}", eptr));
>
>
> Do make the error more useful, also log typeid(*this).name().

OK
I'm OK with it, but how does it differ from output_stream.write() resolving into
exception -- the result would be the same (closed connection) but silence in log.

>> +}
>> +
>>   posix_network_stack::posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator)
>>           : _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
>>   }
>> diff --git a/src/net/tls.cc b/src/net/tls.cc
>> index ea4fbe4e..15a90e69 100644
>> --- a/src/net/tls.cc
>> +++ b/src/net/tls.cc
>> @@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl: public data_sink_impl, public sessio
>>           _session->close();
>>           return make_ready_future<>();
>>       }
>> +
>> +    virtual void terminate(std::exception_ptr) noexcept override {
>> +        _session->close();
>> +    }
>
>
> What happens if someone doesn't implement terminate() for a layered socket?

Nothing bad, it's just regular close() that's also called on regular tls close() (it's
seen at the beginning of this hunk)

Avi Kivity

<avi@scylladb.com>
unread,
Sep 13, 2022, 8:57:52 AM9/13/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
The caller (of output_stream::write() and output_stream::flush()) won't
see it, so they'll keep on writing. Let's say we're transferring a 1TB
file. We'll run a 1TB loop that goes nowhere.
I shared a plan to just get rid of it some time ago. But it's risky
since it involves calling temporary_buffer::share(), which allocates.


But I see we have an allocation anyway:


template <typename CharType>
future<> output_stream<CharType>::do_flush() noexcept {
    if (_end) {
        _buf.trim(_end);
        _end = 0;
        return _fd.put(std::move(_buf)).then([this] {


This std::move() will cause the next write() to allocate. And in fact it
causes bloat, say we write 20-byte messages, each of them will cost am
8k buffer.


            return _fd.flush();
        });
    } else if (_zc_bufs) {
        return _fd.put(std::move(_zc_bufs)).then([this] {
            return _fd.flush();
        });
    } else {
        return make_ready_future<>();
    }
}


>
>> Thinking about it, I believe terminate() is merited.
>> data_source/data_sink represent a half-duplex channel that might be
>> part of a full-duplex channel, and this gives us a link between the
>> two sub-channels.
>>
>>
>>>       // The method should return the maximum buffer size that's
>>> acceptable by
>>>       // the sink. It's used when the output stream is constructed
>>> without any
>>>       // specific buffer size. In this case the stream accepts this
>>> value as its
>>> @@ -169,6 +173,7 @@ class data_sink {
>>>           }
>>>       }
>>> +    void on_batch_flush_error(std::exception_ptr eptr) noexcept {
>>> _dsi->terminate(std::move(eptr)); }
>>
>>
>> This should be private + friends, so no one is tempted to use it and
>> so we can remove it later.
>
> We cannot remove it later unless the batch-flush is also removed, can we?


Yes, but we do have a multi-year plan to remove batch_flush.
It can cause a livelock, no? Although TCP should be prepared for losing
an RST, it's friendlier to sent it.
The caller *knows* the operation failed. It can choose to log it or not,
but at least it knows it ended badly.


Counterpoint: with regular TCP, does close() wait for all data to be
sent? I think it just drops an fd ref.


>>> +}
>>> +
>>>   posix_network_stack::posix_network_stack(const
>>> program_options::option_group& opts,
>>> std::pmr::polymorphic_allocator<char>* allocator)
>>>           : _reuseport(engine().posix_reuseport_available()),
>>> _allocator(allocator) {
>>>   }
>>> diff --git a/src/net/tls.cc b/src/net/tls.cc
>>> index ea4fbe4e..15a90e69 100644
>>> --- a/src/net/tls.cc
>>> +++ b/src/net/tls.cc
>>> @@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl:
>>> public data_sink_impl, public sessio
>>>           _session->close();
>>>           return make_ready_future<>();
>>>       }
>>> +
>>> +    virtual void terminate(std::exception_ptr) noexcept override {
>>> +        _session->close();
>>> +    }
>>
>>
>> What happens if someone doesn't implement terminate() for a layered
>> socket?
>
> Nothing bad, it's just regular close() that's also called on regular
> tls close() (it's
> seen at the beginning of this hunk)


Doesn't it fall to the base class on_internal_error?

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 13, 2022, 9:21:53 AM9/13/22
to Avi Kivity, seastar-dev@googlegroups.com
Provided the buffer size is 1TB -- yes, you're right. Otherwise the stream would have
to _fd.put() at some point and it will (well -- should, as long as this patch makes it
right) resolve with exception.
It shouldn't. Once the stream is written to or read from the operation would fail
and the socket will be finally closed. In fact, the whole idea of this new
.terminate() call is to be like close(), but keeping the "file descriptor" alive.
I assume you mean Linux TCP and the case when the struct file is not shared between
table. In this case tcp_close() will queue FIN packet "normally" and will wait for
it to be sent for real.

>>>> +}
>>>> +
>>>>   posix_network_stack::posix_network_stack(const program_options::option_group& opts, std::pmr::polymorphic_allocator<char>* allocator)
>>>>           : _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
>>>>   }
>>>> diff --git a/src/net/tls.cc b/src/net/tls.cc
>>>> index ea4fbe4e..15a90e69 100644
>>>> --- a/src/net/tls.cc
>>>> +++ b/src/net/tls.cc
>>>> @@ -1670,6 +1670,10 @@ class tls_connected_socket_impl::sink_impl: public data_sink_impl, public sessio
>>>>           _session->close();
>>>>           return make_ready_future<>();
>>>>       }
>>>> +
>>>> +    virtual void terminate(std::exception_ptr) noexcept override {
>>>> +        _session->close();
>>>> +    }
>>>
>>>
>>> What happens if someone doesn't implement terminate() for a layered socket?
>>
>> Nothing bad, it's just regular close() that's also called on regular tls close() (it's
>> seen at the beginning of this hunk)
>
>
> Doesn't it fall to the base class on_internal_error?

No. Well, it probably should have been not _session->close(), but rather the implemented
for this patch _session->terminate() which in turn would need to propagate terminate()
down the stack, but it's regular close().

Avi Kivity

<avi@scylladb.com>
unread,
Sep 13, 2022, 10:20:23 AM9/13/22
to Pavel Emelyanov, seastar-dev@googlegroups.com
Well, a bunch of write(small_buffer) + flush() in a loop. I admit this
isn't a reasonable scenario, but should we let the exception just be
ignored like that? We should try to hide the effect of batch_flush on
errors, since it isn't expected.
Like shutdown() but marking the stream as bad.
But will close(2) wait for tcp_close()? I don't see it.


filp_close() calls file_operations::flush(), but sockets don't define it.


fput() (which corresponds to the last close()) returns void.


But assuming you are right, we should mimic this behavior, since that's
what people will implicitly expect.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 13, 2022, 10:33:39 AM9/13/22
to Avi Kivity, seastar-dev@googlegroups.com

>>>>>> @@ -433,16 +433,11 @@ output_stream<CharType>::flush() noexcept {
>>>>>>       if (!_batch_flushes) {
>>>>>>           return do_flush();
>>>>>>       } else {
>>>>>> -        if (_ex) {
>>>>>> -            // flush is a good time to deliver outstanding errors
>>>>>> -            return make_exception_future<>(std::move(_ex));
>>>>>
>>>>>
>>>>> When would _ex be delivered to the sending side? I don't see it in this patch.
>>>>
>>>> Unfortunately it's not delivered anywhere :(
>>>>
>>>>> Can something that only ever write just keep on writing even though the connection is bad?
>>>>
>>>> It can, but up until when? It shouldn't happen infinitely. Or do you mean that
>>>> someone calls output_stream.write()+.flush(), this thing fires, but the caller
>>>> ignores it and continues writing/flushing?
>>>
>>>
>>> The caller (of output_stream::write() and output_stream::flush()) won't see it, so they'll keep on writing. Let's say we're transferring a 1TB file. We'll run a 1TB loop that goes nowhere.
>>
>> Provided the buffer size is 1TB -- yes, you're right. Otherwise the stream would have
>> to _fd.put() at some point and it will (well -- should, as long as this patch makes it
>> right) resolve with exception.
>
>
> Well, a bunch of write(small_buffer) + flush() in a loop. I admit this isn't a reasonable scenario, but should we let the exception just be ignored like that? We should try to hide the effect of batch_flush on errors, since it isn't expected.

Yes, you'r right. There's a chance that it's unseen up the stack for too long.



[snip]

>>> Counterpoint: with regular TCP, does close() wait for all data to be sent? I think it just drops an fd ref.
>>
>> I assume you mean Linux TCP and the case when the struct file is not shared between
>> table. In this case tcp_close() will queue FIN packet "normally" and will wait for
>> it to be sent for real.
>
>
> But will close(2) wait for tcp_close()? I don't see it.

It goes via "last_put" -> file_operation::release -> inet_release -> tcp_close

> filp_close() calls file_operations::flush(), but sockets don't define it.
>
>
> fput() (which corresponds to the last close()) returns void.

Well, yes. You mean that if it fails, then the callers doesn't see it? (Not that
it doesn't return a future<> and thus happens in the background fiber :P )
I believe that LKML people lost the hope that any application ever checks the
error code from close() and just assume it's de-facto void.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 14, 2022, 3:08:50 AM9/14/22
to Pavel Emelyanov, Avi Kivity, seastar-dev@googlegroups.com
On Tue, Sep 13, 2022 at 04:21:49PM +0300, 'Pavel Emelyanov' via seastar-dev wrote:
> > > >
> > > > So, the exception is lost. We should report the exception, it's the beginning of the debugging trail.
> > >
> > > I'm OK with it, but how does it differ from output_stream.write() resolving into
> > > exception -- the result would be the same (closed connection) but silence in log.
> > >
> >
> > The caller *knows* the operation failed. It can choose to log it or not, but at least it knows it ended badly.
> >
> >
> > Counterpoint: with regular TCP, does close() wait for all data to be sent? I think it just drops an fd ref.
>
> I assume you mean Linux TCP and the case when the struct file is not shared between
> table. In this case tcp_close() will queue FIN packet "normally" and will wait for
> it to be sent for real.
>
What is the point of waiting for it to be sent but not acked?

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 14, 2022, 4:32:22 AM9/14/22
to Gleb Natapov, Avi Kivity, seastar-dev@googlegroups.com
It waits for it to be acked too.

> --
> Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 14, 2022, 4:40:44 AM9/14/22
to Pavel Emelyanov, Avi Kivity, seastar-dev@googlegroups.com
So close() may hang for a while?

--
Gleb.

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Sep 14, 2022, 5:16:04 AM9/14/22
to Gleb Natapov, Avi Kivity, seastar-dev@googlegroups.com
This is how I understand the tcp_close()->sk_stream_wait_close() calls in the kernel.

IIRC, there was a strong recommendation for http-like connections -- when client opens
a connection, sends request, then waits for the response, then all is done -- that it
must be client to close the connection after it reads all the data, server should wait
for read EOF and close its end afterwards. And this recommendation came exactly from the
fact that close() hangs on unsent data and closing the socket client-side (in this case)
is the guarantee that server doesn't have it. For not-applicable cases there's SO_LINGER
that can limit the close() hang time-span.

> --
> Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Sep 14, 2022, 5:30:48 AM9/14/22
to Pavel Emelyanov, Avi Kivity, seastar-dev@googlegroups.com
IIRC that recommendation comes from the details of the TCP state
machine. If done otherwise a lot of sockets in some of many WAIT states
will accumulate on a server and it will run out of ports to listen to.
If the recommendation is followed then those sockets will accumulate on
a client side which is much less of a problem. I do not think it is
related to whether close() sleeps or not. As far as SO_LINGER goes the
man page suggest that close() will wait _only_ if the option is enabled:

SO_LINGER
Sets or gets the SO_LINGER option. The argument is a
linger structure.

struct linger {
int l_onoff; /* linger active */
int l_linger; /* how many seconds to linger for */
};

When enabled, a close(2) or shutdown(2) will not return
until all queued messages for the socket have been
successfully sent or the linger timeout has been reached.
Otherwise, the call returns immediately and the closing is
done in the background. When the socket is closed as part
of exit(2), it always lingers in the background.

--
Gleb.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Oct 3, 2022, 4:47:21 AM10/3/22
to maintainers, seastar-dev@googlegroups.com, Gleb Natapov, Pavel Emelyanov
merge ping

Benny Halevy

<bhalevy@scylladb.com>
unread,
Oct 19, 2022, 6:20:48 AM10/19/22
to Pavel Emelyanov, seastar-dev@googlegroups.com, maintainers
ping

Pavel, What's the status of the series?

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Oct 19, 2022, 6:26:40 AM10/19/22
to Benny Halevy, seastar-dev@googlegroups.com, maintainers
On 19.10.2022 13:20, Benny Halevy wrote:
> ping
>
> Pavel, What's the status of the series?

It needs more care around patch #2
Reply all
Reply to author
Forward
0 new messages