[COMMIT seastar master] Merge 'Split rpc::server stop into two parts' from Pavel Emelyanov

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
May 29, 2023, 5:28:34 AM5/29/23
to seastar-dev@googlegroups.com, Avi Kivity
From: Avi Kivity <a...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: master

Merge 'Split rpc::server stop into two parts' from Pavel Emelyanov

This is to give users the ability to isolate the node from the network without waiting for the internal RPC activity to wrap up. When shutdown()-ed the rpc::server aborts all the connections and accepting socket and returns.

Closes #1671

* github.com:scylladb/seastar:
rpc: Fix formatting after previous patches
rpc: Introduce server::shutdown()
rpc: Wait for server socket to stop before killing conns
rpc: Document server::stop() method

---
diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh
--- a/include/seastar/rpc/rpc.hh
+++ b/include/seastar/rpc/rpc.hh
@@ -608,6 +608,7 @@ private:
promise<> _ss_stopped;
gate _reply_gate;
server_options _options;
+ bool _shutdown = false;
uint64_t _next_client_id = 1;

public:
@@ -616,7 +617,25 @@ public:
server(protocol_base* proto, server_socket, resource_limits memory_limit = resource_limits(), server_options opts = server_options{});
server(protocol_base* proto, server_options opts, server_socket, resource_limits memory_limit = resource_limits());
void accept();
+ /**
+ * Stops the server.
+ *
+ * It makes sure that no new rpcs are admitted, no rpc handlers issued on this
+ * connection are running any longer and no replies on the previously running
+ * handlers will be sent.
+ */
future<> stop();
+ /**
+ * Shuts down the server.
+ *
+ * Light version of the stop, that just makes sure the server is not visible
+ * by remote clients, i.e. -- no new rpcs are admitted and no replies on the
+ * previously running handlers will be sent. Currently running handlers may
+ * still run.
+ *
+ * Caller of shutdown() mush wait for it to resolve before calling stop.
+ */
+ future<> shutdown();
template<typename Func>
void foreach_connection(Func&& f) {
for (auto c : _conns) {
diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -1170,16 +1170,28 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
});
}

- future<> server::stop() {
+ future<> server::shutdown() {
+ if (_shutdown) {
+ return make_ready_future<>();
+ }
+
_ss.abort_accept();
_resources_available.broken();
if (_options.streaming_domain) {
_servers.erase(*_options.streaming_domain);
}
- return when_all(_ss_stopped.get_future(),
- parallel_for_each(_conns | boost::adaptors::map_values, [] (shared_ptr<connection> conn) {
+ return _ss_stopped.get_future().then([this] {
+ return parallel_for_each(_conns | boost::adaptors::map_values, [] (shared_ptr<connection> conn) {
return conn->stop();
- }),
+ });
+ }).finally([this] {
+ _shutdown = true;
+ });
+ }
+
+ future<> server::stop() {
+ return when_all(
+ shutdown(),
_reply_gate.close()
).discard_result();
}
Reply all
Reply to author
Forward
0 new messages