[COMMIT seastar master] websocket: add support for CLOSE control frame

1 view
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 23, 2022, 8:25:49 AM6/23/22
to seastar-dev@googlegroups.com, Andrzej Stalke
From: Andrzej Stalke <38095405...@users.noreply.github.com>
Committer: Piotr Sarna <sa...@scylladb.com>
Branch: master

websocket: add support for CLOSE control frame

The server is now capable of receiving and processing
the CLOSE frame specified by WebSocket protocol.

---
diff --git a/include/seastar/websocket/server.hh b/include/seastar/websocket/server.hh
--- a/include/seastar/websocket/server.hh
+++ b/include/seastar/websocket/server.hh
@@ -37,9 +37,6 @@ namespace seastar::experimental::websocket {
using handler_t = std::function<future<>(input_stream<char>&, output_stream<char>&)>;

class server;
-struct reply {
- //TODO: implement
-};

/*!
* \brief an error in handling a WebSocket connection
@@ -53,6 +50,19 @@ public:
}
};

+/*!
+ * \brief Possible type of a websocket frame.
+ */
+enum opcodes {
+ CONTINUATION = 0x0,
+ TEXT = 0x1,
+ BINARY = 0x2,
+ CLOSE = 0x8,
+ PING = 0x9,
+ PONG = 0xA,
+ INVALID = 0xFF,
+};
+
struct frame_header {
static constexpr uint8_t FIN = 7;
static constexpr uint8_t RSV1 = 6;
@@ -100,8 +110,6 @@ struct frame_header {
}
};

-
-
class websocket_parser {
enum class parsing_state : uint8_t {
flags_and_payload_data,
@@ -148,7 +156,8 @@ public:
future<consumption_result_t> operator()(temporary_buffer<char> data);
bool is_valid() { return _cstate == connection_state::valid; }
bool eof() { return _cstate == connection_state::closed; }
- buff_t result() { return std::move(_result); }
+ opcodes opcode() const;
+ buff_t result();
};

/*!
@@ -217,22 +226,33 @@ class connection : public boost::intrusive::list_base_hook<> {
}
};

+ future<> close(bool send_close);
+
+ /*!
+ * \brief This function processess received PING frame.
+ * https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2
+ */
+ future<> handle_ping();
+ /*!
+ * \brief This function processess received PONG frame.
+ * https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3
+ */
+ future<> handle_pong();
+
static const size_t PIPE_SIZE = 512;
server& _server;
connected_socket _fd;
input_stream<char> _read_buf;
output_stream<char> _write_buf;
http_request_parser _http_parser;
- std::unique_ptr<reply> _resp;
- queue<std::unique_ptr<reply>> _replies{10};
bool _done = false;

websocket_parser _websocket_parser;
queue <temporary_buffer<char>> _input_buffer;
input_stream<char> _input;
-
queue <temporary_buffer<char>> _output_buffer;
output_stream<char> _output;
+
sstring _subprotocol;
handler_t _handler;
public:
@@ -272,9 +292,9 @@ protected:
future<> response_loop();
void on_new_connection();
/*!
- * \brief Packs buff in websocket data frame and sends it to the client.
+ * \brief Packs buff in websocket frame and sends it to the client.
*/
- future<> send_data(temporary_buffer<char>&& buff);
+ future<> send_data(opcodes opcode, temporary_buffer<char>&& buff);

};

diff --git a/src/websocket/server.cc b/src/websocket/server.cc
--- a/src/websocket/server.cc
+++ b/src/websocket/server.cc
@@ -44,6 +44,23 @@ static sstring http_upgrade_reply_template =

static logger wlogger("websocket");

+opcodes websocket_parser::opcode() const {
+ if (_header) {
+ return opcodes(_header->opcode);
+ } else {
+ return opcodes::INVALID;
+ }
+}
+
+websocket_parser::buff_t websocket_parser::result() {
+ _payload_length = 0;
+ _masking_key = 0;
+ _state = parsing_state::flags_and_payload_data;
+ _cstate = connection_state::valid;
+ _header.reset(nullptr);
+ return std::move(_result);
+}
+
void server::listen(socket_address addr, listen_options lo) {
_listeners.push_back(seastar::listen(addr, lo));
do_accepts(_listeners.size() - 1);
@@ -262,25 +279,54 @@ future<websocket_parser::consumption_result_t> websocket_parser::operator()(
return websocket_parser::stop(std::move(data));
}

+future<> connection::handle_ping() {
+ // TODO
+ return make_ready_future<>();
+}
+
+future<> connection::handle_pong() {
+ // TODO
+ return make_ready_future<>();
+}
+
+
future<> connection::read_one() {
return _read_buf.consume(_websocket_parser).then([this] () mutable {
if (_websocket_parser.is_valid()) {
// FIXME: implement error handling
- return _input_buffer.push_eventually(_websocket_parser.result());
+ switch(_websocket_parser.opcode()) {
+ // We do not distinguish between these 3 types.
+ case opcodes::CONTINUATION:
+ case opcodes::TEXT:
+ case opcodes::BINARY:
+ return _input_buffer.push_eventually(_websocket_parser.result());
+ case opcodes::CLOSE:
+ wlogger.debug("Received close frame.");
+ /*
+ * datatracker.ietf.org/doc/html/rfc6455#section-5.5.1
+ */
+ return close(true);
+ case opcodes::PING:
+ return handle_ping();
+ wlogger.debug("Received ping frame.");
+ case opcodes::PONG:
+ wlogger.debug("Received pong frame.");
+ return handle_pong();
+ default:
+ // Invalid - do nothing.
+ ;
+ }
} else if (_websocket_parser.eof()) {
- _done = true;
- return when_all(_input.close(), _output.close()).discard_result();
+ return close(false);
}
wlogger.debug("Reading from socket has failed.");
- _done = true;
- return when_all(_input.close(), _output.close()).discard_result();
+ return close(true);
});
}

future<> connection::read_loop() {
return read_http_upgrade_request().then([this] {
- return when_all(
- _handler(_input, _output),
+ return when_all(_handler(_input, _output),
do_until([this] {return _done;}, [this] {return read_one();})
).then([] (std::tuple<future<>, future<>> joined) {
try {
@@ -295,17 +341,34 @@ future<> connection::read_loop() {
wlogger.debug("Read exception encountered: {}",
std::current_exception());
}
- // FIXME
- return _replies.push_eventually({});
+ return make_ready_future<>();
}).finally([this] {
return _read_buf.close();
});
});
}

-future<> connection::send_data(temporary_buffer<char>&& buff) {
+future<> connection::close(bool send_close) {
+ return [this, send_close]() {
+ if (send_close) {
+ return send_data(opcodes::CLOSE, temporary_buffer<char>(0));
+ } else {
+ return make_ready_future<>();
+ }
+ }().then([this] {
+ _done = true;
+ return when_all(_input.close(), _output.close()).discard_result();
+ });
+}
+
+future<> connection::send_data(opcodes opcode, temporary_buffer<char>&& buff) {
sstring data;
- data.append("\x81", 1);
+ {
+ char first_byte[] = "\x80";
+ first_byte[0] += opcode;
+ data.append(first_byte, 1);
+ }
+
if ((126 <= buff.size()) && (buff.size() <= std::numeric_limits<uint16_t>::max())) {
uint16_t length = buff.size();
length = htobe16(length);
@@ -334,8 +397,10 @@ future<> connection::response_loop() {
// FIXME: implement error handling
return _output_buffer.pop_eventually().then([this] (
temporary_buffer<char> buf) {
- return send_data(std::move(buf));
+ return send_data(opcodes::TEXT, std::move(buf));
});
+ }).finally([this]() {
+ return _write_buf.close();
});
}

Reply all
Reply to author
Forward
0 new messages