[COMMIT seastar master] websocket: revamp shutdown routines

1 view
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 23, 2022, 8:25:51 AM6/23/22
to seastar-dev@googlegroups.com, Piotr Sarna
From: Piotr Sarna <sa...@scylladb.com>
Committer: Piotr Sarna <sa...@scylladb.com>
Branch: master

websocket: revamp shutdown routines

This commit makes the connections react to errors like incorrect
HTTP upgrade requests by shutting the connection down,
which is a reasonable thing to do.
Also, the gating mechanism copied from the http server does not
make the same amount of sense in case of websocket handlers,
because a websocket handler can be an indefinite stream that
can effectively block the gate forever until the client keeps
sending data. Thus, the gate is removed and the connections are
simply closed when the server stops.

---
diff --git a/include/seastar/websocket/server.hh b/include/seastar/websocket/server.hh
--- a/include/seastar/websocket/server.hh
+++ b/include/seastar/websocket/server.hh
@@ -308,9 +308,10 @@ protected:
*/
class server {
std::vector<server_socket> _listeners;
- gate _task_gate;
boost::intrusive::list<connection> _connections;
std::map<std::string, handler_t> _handlers;
+ future<> _accept_fut = make_ready_future<>();
+ bool _stopped = false;
public:
/*!
* \brief listen for a WebSocket connection on given address
diff --git a/src/websocket/server.cc b/src/websocket/server.cc
--- a/src/websocket/server.cc
+++ b/src/websocket/server.cc
@@ -72,24 +72,18 @@ void server::listen(socket_address addr) {
}

void server::do_accepts(int which) {
- // Waited on with the gate
- (void)try_with_gate(_task_gate, [this, which] {
- return keep_doing([this, which] {
- return try_with_gate(_task_gate, [this, which] {
- return do_accept_one(which);
- });
- }).handle_exception_type([](const gate_closed_exception& e) {});
- }).handle_exception_type([](const gate_closed_exception& e) {});
+ _accept_fut = do_until(
+ [this] { return _stopped; },
+ [this, which] { return do_accept_one(which); });
}

future<> server::do_accept_one(int which) {
return _listeners[which].accept().then([this] (accept_result ar) mutable {
auto conn = std::make_unique<connection>(*this, std::move(ar.connection));
- (void)try_with_gate(_task_gate, [conn = std::move(conn)]() mutable {
- return conn->process().handle_exception([conn = std::move(conn)] (std::exception_ptr ex) {
- wlogger.error("request error: {}", ex);
- });
- }).handle_exception_type([] (const gate_closed_exception& e) {});
+ // Tracked by _connections
+ (void)conn->process().finally([conn = std::move(conn)] {
+ wlogger.debug("Connection is finished");
+ });
}).handle_exception_type([] (const std::system_error &e) {
// We expect a ECONNABORTED when server::stop is called,
// no point in warning about that.
@@ -102,14 +96,15 @@ future<> server::do_accept_one(int which) {
}

future<> server::stop() {
- future<> tasks_done = _task_gate.close();
+ _stopped = true;
for (auto&& l : _listeners) {
l.abort_accept();
}
- for (auto&& c : _connections) {
- c.shutdown();
- }
- return tasks_done;
+ return _accept_fut.finally([this] {
+ return parallel_for_each(_connections, [] (connection& conn) {
+ return conn.close().handle_exception([] (auto ignored) {});
+ });
+ });
}

connection::~connection() {
@@ -121,21 +116,8 @@ void connection::on_new_connection() {
}

future<> connection::process() {
- return when_all(read_loop(), response_loop()).then(
- [this] (std::tuple<future<>, future<>> joined) {
- try {
- std::get<0>(joined).get();
- } catch (...) {
- wlogger.debug("Read exception encountered: {}", std::current_exception());
- }
-
- try {
- std::get<1>(joined).get();
- } catch (...) {
- wlogger.debug("Response exception encountered: {}", std::current_exception());
- }
- shutdown();
- return make_ready_future<>();
+ return when_all_succeed(read_loop(), response_loop()).discard_result().handle_exception([] (const std::exception_ptr& e) {
+ wlogger.debug("Processing failed: {}", e);
});
}

@@ -160,21 +142,20 @@ future<> connection::read_http_upgrade_request() {
std::unique_ptr<httpd::request> req = _http_parser.get_parsed_request();
if (_http_parser.failed()) {
return make_exception_future<>(websocket::exception("Incorrect upgrade request"));
- throw websocket::exception("Incorrect upgrade request");
}

sstring upgrade_header = req->get_header("Upgrade");
if (upgrade_header != "websocket") {
- return make_exception_future<>("Upgrade header missing");
+ return make_exception_future<>(websocket::exception("Upgrade header missing"));
}

sstring subprotocol = req->get_header("Sec-WebSocket-Protocol");
if (subprotocol.empty()) {
- return make_exception_future<>("Subprotocol header missing.");
+ return make_exception_future<>(websocket::exception("Subprotocol header missing."));
}

if (!_server.is_handler_registered(subprotocol)) {
- return make_exception_future<>("Subprotocol not supported.");
+ return make_exception_future<>(websocket::exception("Subprotocol not supported."));
}
this->_handler = this->_server._handlers[subprotocol];
this->_subprotocol = subprotocol;
@@ -326,25 +307,16 @@ future<> connection::read_one() {

future<> connection::read_loop() {
return read_http_upgrade_request().then([this] {
- return when_all(_handler(_input, _output),
+ return when_all_succeed(
+ _handler(_input, _output).handle_exception([this] (std::exception_ptr e) mutable {
+ return _read_buf.close().then([e = std::move(e)] () mutable {
+ return make_exception_future<>(std::move(e));
+ });
+ }),
do_until([this] {return _done;}, [this] {return read_one();})
- ).then([] (std::tuple<future<>, future<>> joined) {
- try {
- std::get<0>(joined).get();
- } catch (...) {
- wlogger.debug("Handler exception encountered: {}",
- std::current_exception());
- }
- try {
- std::get<1>(joined).get();
- } catch (...) {
- wlogger.debug("Read exception encountered: {}",
- std::current_exception());
- }
- return make_ready_future<>();
- }).finally([this] {
- return _read_buf.close();
- });
+ ).discard_result();
+ }).finally([this] {
+ return _read_buf.close();
});
}

@@ -355,9 +327,11 @@ future<> connection::close(bool send_close) {
} else {
return make_ready_future<>();
}
- }().then([this] {
+ }().finally([this] {
_done = true;
- return when_all(_input.close(), _output.close()).discard_result();
+ return when_all_succeed(_input.close(), _output.close()).discard_result().finally([this] {
+ shutdown();
+ });
});
}

Reply all
Reply to author
Forward
0 new messages