Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

[PATCHv2] rpc stream: do not abort stream queue if stream connection was closed without error

14 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 26, 2025, 7:15:10 AMJan 26
to seastar-dev@googlegroups.com
queue::abort() drops all queued packets and report an error to a
consumer. If stream connection completes normally we want the consumer
to get all the data without errors, so abort the queue only in case of
an error. Otherwise the queue will wait to be consumed. Since closing
the stream involves sending a special EOS packet the consumer should not
hang since the queue will not be empty.

Fixes: #2612

---
v1->v2:
- an exception is not always raced when a connection is broken.
Sometimes _error is set instead. In both cases _stream_queue has
to be aborted

diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 88d2660c528..d7badc61aaf 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -1022,8 +1022,10 @@ namespace rpc {
}
}
}
+ if (is_stream() && (ep || _error)) {
+ _stream_queue.abort(std::make_exception_ptr(stream_closed()));
+ }
_error = true;
- _stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
_outstanding.clear();
@@ -1244,8 +1246,10 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), ep);
}
_fd.shutdown_input();
+ if (is_stream() && (ep || _error)) {
+ _stream_queue.abort(std::make_exception_ptr(stream_closed()));
+ }
_error = true;
- _stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
get_server()._conns.erase(get_connection_id());

--
Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 28, 2025, 3:39:51 AMJan 28
to seastar-dev@googlegroups.com
Ping.
--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Jan 28, 2025, 6:29:47 AMJan 28
to Gleb Natapov, seastar-dev@googlegroups.com
On Sun, 2025-01-26 at 14:15 +0200, 'Gleb Natapov' via seastar-dev
wrote:


We use four-space indents in this repository, not two.

>            _error = true;
> -         
> _stream_queue.abort(std::make_exception_ptr(stream_closed()));
>            return stop_send_loop(ep).then_wrapped([this] (future<> f)
> {
>                f.ignore_ready_future();
>                _outstanding.clear();
> @@ -1244,8 +1246,10 @@ future<>
> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_t
> yp
>                        format("server{} connection dropped",
> is_stream() ? " stream" : "").c_str(), ep);
>            }
>            _fd.shutdown_input();
> +          if (is_stream() && (ep || _error)) {
> +           
> _stream_queue.abort(std::make_exception_ptr(stream_closed()));
> +          }


Here too.


What about a test?

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 28, 2025, 7:34:50 AMJan 28
to seastar-dev@googlegroups.com
queue::abort() drops all queued packets and report an error to a
consumer. If stream connection completes normally we want the consumer
to get all the data without errors, so abort the queue only in case of
an error. Otherwise the queue will wait to be consumed. Since closing
the stream involves sending a special EOS packet the consumer should not
hang since the queue will not be empty.

Fixes: #2612

---
v1->v2:
- an exception is not always raced when a connection is broken.
Sometimes _error is set instead. In both cases _stream_queue has
to be aborted
v2->v3:
- fix indentation


diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
index 88d2660c528..84336a08568 100644
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -1022,8 +1022,10 @@ namespace rpc {
}
}
}
+ if (is_stream() && (ep || _error)) {
+ _stream_queue.abort(std::make_exception_ptr(stream_closed()));
+ }
_error = true;
- _stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
_outstanding.clear();
@@ -1244,8 +1246,10 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
format("server{} connection dropped", is_stream() ? " stream" : "").c_str(), ep);
}
_fd.shutdown_input();
+ if (is_stream() && (ep || _error)) {
+ _stream_queue.abort(std::make_exception_ptr(stream_closed()));
+ }
_error = true;
- _stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
get_server()._conns.erase(get_connection_id());
--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Jan 28, 2025, 1:00:01 PMJan 28
to Gleb Natapov, seastar-dev@googlegroups.com
I asked for a test.

On Tue, 2025-01-28 at 14:34 +0200, 'Gleb Natapov' via seastar-dev
wrote:

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 29, 2025, 3:01:29 AMJan 29
to Avi Kivity, seastar-dev@googlegroups.com
On Tue, Jan 28, 2025 at 07:59:54PM +0200, Avi Kivity wrote:
> I asked for a test.
>
I missed it. But I can only write what I wrote to Tomek:

That's not easy to write a reliable reproducer without error injection
which, iirc, we do not have in seastar. Without error injection the problem
only happens in debug mode since continuation re-ordering is needed to
hit it and even then it was very rare in the scylladb reproducer that we
have.
--
Gleb.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jan 30, 2025, 4:02:42 AMJan 30
to Avi Kivity, seastar-dev@googlegroups.com
Ping. What do you think?
--
Gleb.

avi@scylladb.com

<avi@scylladb.com>
unread,
Feb 2, 2025, 5:24:15 AMFeb 2
to Gleb Natapov, seastar-dev@googlegroups.com
I think I'm seeing it (or its cousin) very often now in ScyllaDB CI failures. I'll merge it but let's think of ways to test RPC well.
Reply all
Reply to author
Forward
0 new messages