From: Pavel Emelyanov <
xe...@scylladb.com>
Committer: Pavel Emelyanov <
xe...@scylladb.com>
Branch: master
rpc: Introduce server::shutdown()
Split the rpc::server::stop() into two phases.
First, the shutdown() one that closes accepting socket and stops all the
active connections. Second, the stop() that waits for all of the above
to wrap up.
The shutdown() stage can be called first, if not stop() would call it on
its own.
Formatting is deliberately left broken for the ease of review.
Signed-off-by: Pavel Emelyanov <
xe...@scylladb.com>
---
diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh
--- a/include/seastar/rpc/rpc.hh
+++ b/include/seastar/rpc/rpc.hh
@@ -608,6 +608,7 @@ private:
promise<> _ss_stopped;
gate _reply_gate;
server_options _options;
+ bool _shutdown = false;
uint64_t _next_client_id = 1;
public:
@@ -624,6 +625,17 @@ public:
* handlers will be sent.
*/
future<> stop();
+ /**
+ * Shuts down the server.
+ *
+ * Light version of the stop, that just makes sure the server is not visible
+ * by remote clients, i.e. -- no new rpcs are admitted and no replies on the
+ * previously running handlers will be sent. Currently running handlers may
+ * still run.
+ *
+ * Caller of shutdown() mush wait for it to resolve before calling stop.
+ */
+ future<> shutdown();
template<typename Func>
void foreach_connection(Func&& f) {
for (auto c : _conns) {
diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -1170,20 +1170,31 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
});
}
- future<> server::stop() {
+ future<> server::shutdown() {
+ if (_shutdown) {
+ return make_ready_future<>();
+ }
+
_ss.abort_accept();
_resources_available.broken();
if (_options.streaming_domain) {
_servers.erase(*_options.streaming_domain);
}
return _ss_stopped.get_future().then([this] {
- return when_all(
+ return
parallel_for_each(_conns | boost::adaptors::map_values, [] (shared_ptr<connection> conn) {
return conn->stop();
- }),
+ });
+ }).finally([this] {
+ _shutdown = true;
+ });
+ }
+
+ future<> server::stop() {
+ return when_all(
+ shutdown(),
_reply_gate.close()
).discard_result();
- });
}
void server::abort_connection(connection_id id) {