I profiled my websocket implementation via callgrind, but wasn't able to find anything promising. It's weird me because, when running with -c1, the cpu is running at 100% solid. But when adding cores, load is decreasing.
With 500 concurrent connections sending messages, a 24 core instance has the same throughput as when running on a single core. Which is a shame really.
diff --git a/apps/httpd/main.cc b/apps/httpd/main.cc
index f112a6e..999c4f6 100644
--- a/apps/httpd/main.cc
+++ b/apps/httpd/main.cc
@@ -21,10 +21,12 @@
#include "http/httpd.hh"
#include "http/handlers.hh"
+#include "http/websocket_handler.hh"
#include "http/function_handlers.hh"
#include "http/file_handler.hh"
#include "apps/httpd/demo.json.hh"
#include "http/api_docs.hh"
+#include "core/prometheus.hh"
namespace bpo = boost::program_options;
@@ -47,10 +49,45 @@ void set_routes(routes& r) {
function_handler* h2 = new function_handler([](std::unique_ptr<request> req) {
return make_ready_future<json::json_return_type>("json-future");
});
+
+ websocket_function_handler* ws1 = new websocket_function_handler([](const httpd::request& req, connected_websocket ws) {
+ auto input = ws.input();
+ auto output = ws.output();
+ return do_with(std::move(input), std::move(output), [] (websocket_input_stream &input,
+ websocket_output_stream &output) {
+ return repeat([&input, &output] {
+ return input.read().then([&output](std::unique_ptr<httpd::websocket_message> buf){
+ if (!buf)
+ return make_ready_future<bool_class<stop_iteration_tag>>(stop_iteration::yes);
+ return output.write(std::move(buf)).then([] {
+ return stop_iteration::no;
+ });
+ });
+ });
+ });
+ });
+
+ auto ws_managed_handler = new websocket_handler();
+
+ ws_managed_handler->on_connection_future([] (const httpd::request& req, websocket_output_stream* ws) {
+ temporary_buffer<char> test("Hello from seastar !", 20);
+ return ws->write(websocket_opcode::TEXT, std::move(test));
+ });
+
+ ws_managed_handler->on_message_future([] (const httpd::request& req, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) {
+ return ws->write(std::move(message));
+ });
+
+ ws_managed_handler->on_disconnection([] (const httpd::request& req, websocket_output_stream* ws) {
+
+ });
+
r.add(operation_type::GET, url("/"), h1);
r.add(operation_type::GET, url("/jf"), h2);
- r.add(operation_type::GET, url("/file").remainder("path"),
- new directory_handler("/"));
+ r.add(operation_type::GET, url("/file").remainder("path"), new directory_handler("/"));
+ r.put("/managed", ws_managed_handler);
+ r.put("/", ws1);
+
demo_json::hello_world.set(r, [] (const_req req) {
demo_json::my_object obj;
@@ -65,18 +102,18 @@ void set_routes(routes& r) {
int main(int ac, char** av) {
app_template app;
app.add_options()("port", bpo::value<uint16_t>()->default_value(10000),
- "HTTP Server port");
+ "HTTP Server port");
return app.run_deprecated(ac, av, [&] {
- auto&& config = app.configuration();
+ auto &&config = app.configuration();
uint16_t port = config["port"].as<uint16_t>();
auto server = new http_server_control();
auto rb = make_shared<api_registry_builder>("apps/httpd/");
server->start().then([server] {
return server->set_routes(set_routes);
- }).then([server, rb]{
- return server->set_routes([rb](routes& r){rb->set_api_doc(r);});
- }).then([server, rb]{
- return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");});
+ }).then([server, rb] {
+ return server->set_routes([rb](routes &r) { rb->set_api_doc(r); });
+ }).then([server, rb] {
+ return server->set_routes([rb](routes &r) { rb->register_function(r, "demo", "hello world application"); });
}).then([server, port] {
return server->listen(port);
}).then([server, port] {
@@ -85,6 +122,6 @@ int main(int ac, char** av) {
return server->stop();
});
});
-
});
}
+
diff --git a/configure.py b/configure.py
index a9acc3c..a1cc277 100755
--- a/configure.py
+++ b/configure.py
@@ -328,7 +328,8 @@ http = ['http/transformers.cc',
'http/reply.cc',
'http/request_parser.rl',
'http/api_docs.cc',
- 'http/websocket.cc'
+ 'http/websocket.cc',
+ 'http/websocket_fragment.cc'
]
boost_test_lib = [
diff --git a/core/reactor.hh b/core/reactor.hh
index d48ca04..10bbf65 100644
--- a/core/reactor.hh
+++ b/core/reactor.hh
@@ -68,6 +68,7 @@
#include "core/enum.hh"
#include "core/memory.hh"
#include <boost/range/irange.hpp>
+#include <http/websocket_fragment.hh>
#include "timer.hh"
#include "condition-variable.hh"
#include "util/log.hh"
diff --git a/http/handlers.hh b/http/handlers.hh
index 9a9888a..54c0457 100644
--- a/http/handlers.hh
+++ b/http/handlers.hh
@@ -26,8 +26,10 @@
#include "common.hh"
#include "reply.hh"
#include "core/future-util.hh"
+#include "websocket.hh"
#include <unordered_map>
+#include <net/api.hh>
namespace httpd {
@@ -68,6 +70,39 @@ class handler_base {
};
+/**
+ * handlers holds the logic for serving an incoming request.
+ * All handlers inherit from the base handler_websocket_base and
+ * implement the handle method.
+ */
+class handler_websocket_base {
+public:
+ /**
+ * All handlers should implement this method.
+ * It fill the reply according to the request.
+ * @param path the url path used in this call
+ * @param params optional parameter object
+ * @param req the original request
+ * @param rep the reply
+ */
+ virtual future<> handle(const sstring& path, connected_websocket ws) = 0;
+
+ virtual ~handler_websocket_base() = default;
+
+ /**
+ * Add a mandatory parameter
+ * @param param a parameter name
+ * @return a reference to the handler
+ */
+ handler_websocket_base& mandatory(const sstring& param) {
+ _mandatory_param.push_back(param);
+ return *this;
+ }
+
+ std::vector<sstring> _mandatory_param;
+
+};
+
}
#endif /* HANDLERS_HH_ */
diff --git a/http/httpd.hh b/http/httpd.hh
index 4ded1d7..036b89c 100644
--- a/http/httpd.hh
+++ b/http/httpd.hh
@@ -44,12 +44,15 @@
#include <boost/intrusive/list.hpp>
#include "reply.hh"
#include "http/routes.hh"
+#include "http/websocket.hh"
#include <cryptopp/sha.h>
#include <cryptopp/filters.h>
#include <cryptopp/hex.h>
#include <cryptopp/base64.h>
+
#include "core/sleep.hh"
+#include <chrono>
namespace httpd {
@@ -149,13 +152,14 @@ namespace httpd {
http_request_parser _parser;
std::unique_ptr<request> _req;
std::unique_ptr<reply> _resp;
+ socket_address _addr;
// null element marks eof
queue<std::unique_ptr<reply>> _replies { 10 };bool _done = false;
public:
connection(http_server& server, connected_socket&& fd,
socket_address addr)
: _server(server), _fd(std::move(fd)), _read_buf(_fd.input()), _write_buf(
- _fd.output()) {
+ _fd.output()), _addr(addr) {
++_server._total_connections;
++_server._current_connections;
_server._connections.push_back(*this);
@@ -365,7 +369,7 @@ namespace httpd {
} else if (it->second.find("Upgrade") != std::string::npos) {
auto upgrade = req->_headers.find("Upgrade");
if (upgrade != req->_headers.end() && upgrade->second == "websocket")
- return upgrade_websocket(std::move(req)); //websocket upgrade
+ return upgrade_websocket(std::move(req)).then([] { return true; }); //websocket upgrade
}
}
bool should_close;
@@ -386,24 +390,23 @@ namespace httpd {
sstring url = set_query_param(*req.get());
sstring version = req->_version;
- return _server._routes.handle(url, std::move(req), std::move(resp)).
- // Caller guarantees enough room
- then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
+ return _server._routes.handle(url, std::move(req), std::move(resp)).then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
rep->set_version(version).done();
this->_replies.push(std::move(rep));
return make_ready_future<bool>(should_close);
});
}
- future<bool> upgrade_websocket(std::unique_ptr<request> req) {
+ future<> upgrade_websocket(std::unique_ptr<request> req) {
constexpr char websocket_uuid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
constexpr size_t websocket_uuid_len = 36;
+ sstring url = set_query_param(*req.get());
auto resp = std::make_unique<reply>();
resp->set_version(req->_version);
auto it = req->_headers.find("Sec-WebSocket-Key");
- if (it != req->_headers.end()) {
+ if (it != req->_headers.end() && _server._routes.get_ws_handler(url, *req.get())) {
//Success
resp->_headers["Upgrade"] = "websocket";
resp->_headers["Connection"] = "Upgrade";
@@ -429,22 +432,15 @@ namespace httpd {
resp->set_status(reply::status_type::switching_protocols).done();
_replies.push(std::move(resp));
- return repeat([this] {
- return _read_buf.read().then([this](auto buf){
- std::cout << "read from connection : ";
- std::cout.write(buf.begin(), buf.size());
- std::cout << std::endl;
- for (std::size_t i = 0; i < buf.size(); ++i)
- std::cout << std::bitset<8>(buf.begin()[i]) << std::endl;
- if (!buf)
- return make_ready_future<bool_class<stop_iteration_tag> >(stop_iteration::yes);
- return _write_buf.write(std::move(buf)).then([this] { return _write_buf.flush(); }).then([] {
- return stop_iteration::no;
- });
- });
- }).then([] {
- std::cout << "closing connection" << std::endl;
- return true;
+ return do_until([this] {
+ return _replies.empty();
+ }, [this] {
+ //If we don't wait, the HTTP response might not be flushed before user-code start sending messages
+ //resulting in malformed handcheck response.
+ return sleep(std::chrono::milliseconds(30)); //FIXME this is awful
+ }).then([this, req = std::move(req), url] {
+ auto ws = connected_websocket(&_fd, _addr, *req.get());
+ return _server._routes.handle_ws(url, std::move(ws));
});
}
else {
@@ -452,8 +448,7 @@ namespace httpd {
resp->set_status(reply::status_type::bad_request);
}
resp->done();
- this->_replies.push(std::move(resp));
- return make_ready_future<bool>(true);
+ return this->_replies.push_eventually(std::move(resp));
}
future<> write_body() {
@@ -507,9 +502,8 @@ namespace httpd {
static sstring generate_server_name();
public:
http_server_control() : _server_dist(new distributed<http_server>) {
- }
-
+ }
future<> start(const sstring& name = generate_server_name()) {
return _server_dist->start(name);
}
diff --git a/http/routes.cc b/http/routes.cc
index 18d973b..54938b5 100644
--- a/http/routes.cc
+++ b/http/routes.cc
@@ -106,6 +106,17 @@ future<std::unique_ptr<reply> > routes::handle(const sstring& path, std::unique_
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
+future<> routes::handle_ws(const sstring &path, connected_websocket ws) {
+ handler_websocket_base* handler = get_ws_handler(normalize_url(path), ws._request);
+ if (handler != nullptr) {
+ for (auto& i : handler->_mandatory_param) {
+ verify_param(ws._request, i);
+ }
+ return handler->handle(path, std::move(ws));
+ }
+ return make_ready_future();
+}
+
sstring routes::normalize_url(const sstring& url) {
if (url.length() < 2 ||
url.at(url.length() - 1) != '/') {
return url;
@@ -131,6 +142,22 @@ handler_base* routes::get_handler(operation_type type, const sstring& url,
return nullptr;
}
+handler_websocket_base *routes::get_ws_handler(const sstring &url, const httpd::request& req) {
+ handler_websocket_base* handler = (_map_ws.find(url) == _map_ws.end()) ? nullptr : _map_ws[url];
+ if (handler != nullptr) {
+ try {
+ for (auto& i : handler->_mandatory_param) {
+ verify_param(req, i);
+ }
+ }
+ catch (const missing_param_exception &e) {
+ return nullptr;
+ }
+ return handler;
+ }
+ return nullptr;
+}
+
routes& routes::add(operation_type type, const url& url,
handler_base* handler) {
match_rule* rule = new match_rule(handler);
diff --git a/http/routes.hh b/http/routes.hh
index 187b7e9..ccc6284 100644
--- a/http/routes.hh
+++ b/http/routes.hh
@@ -31,6 +31,7 @@
#include <unordered_map>
#include <vector>
#include "core/future-util.hh"
+#include "websocket.hh"
namespace httpd {
@@ -95,6 +96,19 @@ class routes {
}
/**
+ * adding a handler as an exact match
+ * @param url the url to match (note that url should start with /)
+ * @param handler the desire handler
+ * @return it self
+ */
+ routes& put(const sstring& url, handler_websocket_base* handler) {
+ //FIXME if a handler is already exists, it need to be
+ // deleted to prevent memory leak
+ _map_ws[url] = handler;
+ return *this;
+ }
+
+ /**
* add a rule to be used.
* rules are search only if an exact match was not found.
* rules are search by the order they were added.
@@ -130,6 +144,28 @@ class routes {
*/
future<std::unique_ptr<reply> > handle(const sstring& path, std::unique_ptr<request> req, std::unique_ptr<reply> rep);
+
+ /**
+ * the main entry point.
+ * the general handler calls this method with the request
+ * the method takes the headers from the request and find the
+ * right handler.
+ * It then call the handler with the parameters (if they exists) found in the url
+ * @param path the url path found
+ * @param req the http request
+ * @param rep the http reply
+ */
+ future<> handle_ws(const sstring& path, connected_websocket ws);
+
+ /**
+ * Search and return a handler by the operation type and url
+ * @param type the http operation type
+ * @param url the request url
+ * @param params a parameter object that will be filled during the match
+ * @return a handler based on the type/url match
+ */
+ handler_websocket_base* get_ws_handler(const sstring& url, const httpd::request& req);
+
/**
* Search and return an exact match
* @param url the request url
@@ -163,6 +199,10 @@ class routes {
std::unordered_map<sstring, handler_base*> _map[NUM_OPERATION];
std::vector<match_rule*> _rules[NUM_OPERATION];
+
+ //Websocket
+ std::unordered_map<sstring, handler_websocket_base*> _map_ws;
+
public:
using exception_handler_fun = std::function<std::unique_ptr<reply>(std::exception_ptr eptr)>;
using exception_handler_id = size_t;
diff --git a/http/websocket.cc b/http/websocket.cc
index 346f560..8b8a23a 100644
--- a/http/websocket.cc
+++ b/http/websocket.cc
@@ -4,68 +4,82 @@
#include "websocket.hh"
-server_websocket::server_websocket(socket_address sa, listen_options opts) : _sa(sa), _opts(opts) {}
-
-void server_websocket::listen() {
- _server_socket = engine().listen(_sa, _opts);
+httpd::connected_websocket::connected_websocket(connected_socket *socket, socket_address &remote_adress,
+ request &request) noexcept : _socket(std::move(socket)),
+ remote_adress(remote_adress),
+ _request(request) {
}
-future<connected_websocket> server_websocket::accept() {
- return _server_socket.accept().then([] (connected_socket sock, socket_address addr) {
- return connected_websocket(std::move(sock), addr);
- });
+httpd::connected_websocket::connected_websocket(httpd::connected_websocket &&cs) noexcept : _socket(std::move(cs._socket)),
+ remote_adress(cs.remote_adress),
+ _request(std::move(cs._request)) {
}
-connected_websocket::connected_websocket(connected_socket &&socket,
- socket_address &remote_adress) noexcept : _socket(std::move(socket)), remote_adress(remote_adress) {}
-
-connected_websocket::connected_websocket(connected_websocket &&cs) noexcept : _socket(std::move(cs._socket)), remote_adress(cs.remote_adress) {
-
-}
-
-connected_websocket &connected_websocket::operator=(connected_websocket &&cs) noexcept {
+httpd::connected_websocket &httpd::connected_websocket::operator=(httpd::connected_websocket &&cs) noexcept {
_socket = std::move(cs._socket);
remote_adress = std::move(cs.remote_adress);
return *this;
}
-future<websocket_fragment> websocket_input_stream::readFragment() {
- return _stream.read().then([] (temporary_buffer<char> buf) {
- websocket_fragment fragment(std::move(buf));
- return fragment;
- });
-}
-
-future<temporary_buffer<char>> websocket_input_stream::read() {
- _buf.clear();
- return repeat([this] {
+future<> httpd::websocket_input_stream::read_fragment() {
+ auto parse_fragment = [this] {
+ if (_buf.size() - _index > 2)
+ _fragment = std::move(std::make_unique<inbound_websocket_fragment>(_buf, &_index));
+ };
- return readFragment().then([this] (auto fragment) {
- if (fragment.data){
- _buf.append(fragment.data.get(), fragment.data.size());
- if (fragment.fin())
- return stop_iteration::yes;
- return stop_iteration::no;
- }
- return stop_iteration::yes;
+ _fragment = nullptr;
+ if (!_buf || _index >= _buf.size())
+ return _stream.read().then([this, parse_fragment](temporary_buffer<char> buf) {
+ _buf = std::move(buf);
+ _index = 0;
+ parse_fragment();
});
+ parse_fragment();
+ return make_ready_future();
+}
-/* return _stream.read().then([this] (temporary_buffer<char> buf) {
- if (buf) {
- _buf.append(buf.get(), buf.size());
- if (buf.get()[0] == 'h')
- return stop_iteration::yes;
- return stop_iteration::no;
- } else {
+future<std::unique_ptr<httpd::websocket_message>> httpd::websocket_input_stream::read() {
+ _lastmassage = nullptr;
+ return repeat([this] { // gather all fragments and concatenate full message
+ return read_fragment().then([this] {
+ if (!_fragment || _fragment->_is_empty)
+ return stop_iteration::yes;
+ else if (_fragment->fin()) {
+ if (!_lastmassage)
+ _lastmassage = std::move(std::make_unique<websocket_message>(std::move(_fragment)));
+ else
+ _lastmassage->append(std::move(_fragment));
return stop_iteration::yes;
}
+ else if (_fragment->opcode() == CONTINUATION)
+ _lastmassage->append(std::move(_fragment));
+ return stop_iteration::no;
});
-*/
+ }).then([this] {
+ return std::move(_lastmassage);
+ });
+}
+/*
+ * When the write is called and if (!_buf || _index >= _buf.size()) == false, it would make sense
+ * to buff it and flush everything at once before the next read().
+ */
+future<> httpd::websocket_output_stream::write(std::unique_ptr<httpd::websocket_message> message) {
+ message->done();
+ return do_with(std::move(message), [this] (std::unique_ptr<httpd::websocket_message> &frag) {
+ temporary_buffer<char> head((char *)&frag->_header, frag->_header_size);
+ return this->_stream.write(std::move(head)).then([this, &frag] {
+ return do_for_each(frag->_fragments, [this] (temporary_buffer<char> &buff) {
+ return this->_stream.write(std::move(buff));
+ });
+ });
}).then([this] {
- if (_buf.empty())
- return make_ready_future<temporary_buffer<char>>();
- std::cout<<"size is " << _buf.size() << std::endl;
- return make_ready_future<temporary_buffer<char>>(std::move(temporary_buffer<char>(_buf.c_str(), _buf.size())));
+ return this->_stream.flush();
+ }).handle_exception([this] (std::exception_ptr e) {
+ return _stream.close();
});
+}
+
+future<> httpd::websocket_output_stream::write(websocket_opcode kind, temporary_buffer<char> buf) {
+ return write(std::move(std::make_unique<websocket_message>(kind, std::move(buf))));
}
\ No newline at end of file
diff --git a/http/websocket.hh b/http/websocket.hh
index 65e134e..ba0ea75 100644
--- a/http/websocket.hh
+++ b/http/websocket.hh
@@ -6,97 +6,27 @@
#define SEASTARPLAYGROUND_WEBSOCKET_HPP
#include <core/reactor.hh>
-#include "core/scattered_message.hh"
#include <net/socket_defs.hh>
+#include "websocket_fragment.hh"
+#include "request.hh"
-class websocket_fragment {
- temporary_buffer<char> _raw;
- uint16_t _header;
-
- bool _fin;
- unsigned char _opcode;
- uint64_t _lenght;
- bool _rsv23;
- bool _rsv1;
- bool _masked;
-
- uint32_t _maskkey;
-
-public:
-
- temporary_buffer<char> data;
-
- websocket_fragment(temporary_buffer<char> &&raw) : _raw(std::move(raw)), _header(0) {
- uint64_t i = sizeof(uint16_t);
- std::memcpy(&_header, _raw.begin(), sizeof(uint16_t));
- _fin = _header & 128;
- _opcode = _header & 15;
- _rsv23 = _header & 48;
- _rsv1 = _header & 64;
- _masked = _header & 32768;
- _lenght = (_header >> 8) & 127;
- if (_lenght == 126)
- {
- _lenght = *((uint16_t *) _raw.share(i, sizeof(uint16_t)).get());
- i += sizeof(uint16_t);
- }
- else if (_lenght == 127)
- {
- _lenght = *((uint64_t *) _raw.share(i, sizeof(uint64_t)).get());
- i += sizeof(uint64_t);
- }
-
- if (_masked)
- {
- _maskkey = *((uint16_t *) _raw.share(i, sizeof(uint32_t)).get());
- i += sizeof(uint32_t);
- }
-
- data = _raw.share(i, _lenght);
- }
-
-public:
- bool fin() { return _fin; }
- unsigned char opcode() { return _opcode; }
- uint64_t &length() { return _lenght; }
- bool rsv23() { return _rsv23; }
- bool rsv1() { return _rsv1; }
- bool masked() { return _masked; }
-
- bool valid() {
- return !((rsv1() /*&& !setCompressed(user)*/) || rsv23() || (opcode() > 2 && opcode() < 8) ||
- opcode() > 10 || (opcode() > 2 && (!fin() || length() > 125)));
- }
-};
+namespace httpd {
class websocket_output_stream final {
output_stream<char> _stream;
temporary_buffer<char> _buf;
size_t _size = 0;
- size_t _begin = 0;
- size_t _end = 0;
- bool _trim_to_size = false;
- bool _batch_flushes = false;
- std::experimental::optional<promise<>> _in_batch;
- bool _flush = false;
- bool _flushing = false;
- std::exception_ptr _ex;
public:
websocket_output_stream() = default;
+
websocket_output_stream(output_stream<char> stream) : _stream(std::move(stream)) {}
- websocket_output_stream(websocket_output_stream&&) = default;
- websocket_output_stream& operator=(websocket_output_stream&&) = default;
- //future<> write(const char* buf, size_t n);
- //future<> write(const char* buf);
+ websocket_output_stream(websocket_output_stream &&) = default;
- //future<> write(const basic_sstring<StringChar, SizeType, MaxSize>& s);
- //future<> write(const std::basic_string<char>& s);
+ websocket_output_stream &operator=(websocket_output_stream &&) = default;
- //future<> write(net::packet p);
- //future<> write(scattered_message<char> msg);
- //future<> write(temporary_buffer<char_type>);
- //future<> flush();
+ future<> write(std::unique_ptr<httpd::websocket_message> message);
+ future<> write(websocket_opcode kind, temporary_buffer<char>);
future<> close() { return _stream.close(); };
private:
friend class reactor;
@@ -104,61 +34,51 @@ class websocket_output_stream final {
class websocket_input_stream final {
input_stream<char> _stream;
- std::string _buf = "";
- bool _eof = false;
+ std::unique_ptr<inbound_websocket_fragment> _fragment;
+ std::unique_ptr<websocket_message> _lastmassage;
+ temporary_buffer<char> _buf;
+ uint32_t _index = 0;
+
private:
- using tmp_buf = temporary_buffer<char>;
- size_t available() const { return _buf.size(); }
-protected:
- void reset() { _buf = {}; }
+ future<> parse_fragment();
public:
websocket_input_stream() = default;
- explicit websocket_input_stream(input_stream<char> stream) : _stream(std::move(stream)), _buf("") {}
- websocket_input_stream(websocket_input_stream&&) = default;
- websocket_input_stream& operator=(websocket_input_stream&&) = default;
- bool eof() { return _eof; }
+ explicit websocket_input_stream(input_stream<char> stream) : _stream(std::move(stream)) {}
- future<websocket_fragment> readFragment();
+ websocket_input_stream(websocket_input_stream &&) = default;
- future<temporary_buffer<char>> read();
- future<> close() { return _stream.close(); }
+ websocket_input_stream &operator=(websocket_input_stream &&) = default;
- /// Ignores n next bytes from the stream.
- //future<> skip(uint64_t n);
+ future<std::unique_ptr<httpd::websocket_message>> read();
+
+ future<> read_fragment();
+
+ future<> close() { return _stream.close(); }
};
class connected_websocket {
private:
- connected_socket _socket;
+ connected_socket *_socket;
+
public:
socket_address remote_adress;
- connected_websocket(connected_socket &&_socket, socket_address &remote_adress) noexcept;
+ request _request;
- connected_websocket(connected_websocket&& cs) noexcept;
- connected_websocket& operator=(connected_websocket&& cs) noexcept;
+ connected_websocket(connected_socket *socket, socket_address &remote_adress, request &request) noexcept;
+
+ connected_websocket(connected_websocket &&cs) noexcept;
+
+ connected_websocket &operator=(connected_websocket &&cs) noexcept;
websocket_input_stream input() {
- return websocket_input_stream(std::move(_socket.input()));
+ return websocket_input_stream(std::move(_socket->input()));
}
websocket_output_stream output() {
- return websocket_output_stream(std::move(_socket.output()));
+ return websocket_output_stream(std::move(_socket->output()));
}
};
-
-class server_websocket {
-
-private:
- socket_address _sa;
- listen_options _opts;
- server_socket _server_socket;
-
-public:
- server_websocket(socket_address sa, listen_options opts = {});
-
- void listen();
- future<connected_websocket> accept();
-};
+}
#endif //SEASTARPLAYGROUND_WEBSOCKET_HPP
\ No newline at end of file
diff --git a/http/websocket_fragment.cc b/http/websocket_fragment.cc
new file mode 100644
index 0000000..7291969
--- /dev/null
+++ b/http/websocket_fragment.cc
@@ -0,0 +1,119 @@
+//
+// Created by hbarraud on 4/2/17.
+//
+
+#include "websocket_fragment.hh"
+
+/* 0 1 2 3
+0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-------+-+-------------+-------------------------------+
+|F|R|R|R| opcode|M| Payload len | Extended payload length |
+|I|S|S|S| (4) |A| (7) | (16/64) |
+|N|V|V|V| |S| | (if payload len==126/127) |
+| |1|2|3| |K| | |
++-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+| Extended payload length continued, if payload len == 127 |
++ - - - - - - - - - - - - - - - +-------------------------------+
+| |Masking-key, if MASK set to 1 |
++-------------------------------+-------------------------------+
+| Masking-key (continued) | Payload Data |
++-------------------------------- - - - - - - - - - - - - - - - +
+: Payload Data continued ... :
++ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+| Payload Data continued ... |
++---------------------------------------------------------------+
+*/
+httpd::inbound_websocket_fragment::inbound_websocket_fragment(temporary_buffer<char> &raw, uint32_t *i) : websocket_fragment_base() {
+ auto buf = raw.get_write();
+
+ //First header byte
+ _fin = buf[*i] & 128;
+ _rsv1 = buf[*i] & 64;
+ _opcode = static_cast<websocket_opcode>(buf[*i] & 15);
+ *i += sizeof(uint8_t);
+
+ //Second header byte
+ _masked = buf[*i] & 128;
+ _lenght = buf[*i] & 127;
+ *i += sizeof(uint8_t);
+
+ if (_lenght == 126 && raw.size() >= *i + sizeof(uint16_t)) {
+ _lenght = ntohs(*reinterpret_cast<uint16_t*>(buf + *i));
+ *i += sizeof(uint16_t);
+ }
+ else if (_lenght == 127 && raw.size() >= *i + sizeof(uint64_t)) {
+ _lenght = ntohl(*reinterpret_cast<uint64_t*>(buf + *i));
+ *i += sizeof(uint64_t);
+ }
+
+ if (_masked && raw.size() >= *i + _lenght + sizeof(uint32_t)) {
+ //message is masked
+ uint64_t k = *i;
+ *i += sizeof(uint32_t);
+ message = std::move(raw.share(*i, _lenght));
+ unmask(buf + *i, buf + *i, buf + k, _lenght);
+ _is_empty = false;
+ *i += _lenght;
+ } else if (raw.size() >= *i + _lenght) {
+ message = std::move(raw.share(*i, _lenght));
+ _is_empty = false;
+ *i += _lenght;
+ }
+}
+
+void httpd::websocket_message::done() {
+ const auto header = opcode ^ 0x80;
+
+ uint64_t len = 0;
+ for (auto &fragment : _fragments)
+ len += fragment.size();
+
+ if (len < 125) { //Size fits 7bits
+ _header[0] = header;
+ _header[1] = static_cast<unsigned char>(len);
+ _header_size = 2;
+ } //Size in extended to 16bits
+ else if (len < std::numeric_limits<uint16_t>::max()) {
+ _header[0] = header;
+ _header[1] = static_cast<unsigned char>(126);
+ auto s = htons(len);
+ std::memcpy(_header + 2, &s, 2);
+ _header_size = 4;
+ }
+ else { //Size extended to 64bits
+ _header[0] = header;
+ _header[1] = static_cast<unsigned char>(127);
+ auto l = htonl(len);
+ std::memcpy(_header + 2, &l, 8);
+ _header_size = 10;
+ }
+}
+
+temporary_buffer<char> & httpd::websocket_message::concat() {
+ if (!_concatenated.empty())
+ return _concatenated;
+
+ uint64_t length = 0;
+ for (auto& n : _fragments)
+ length += n.size();
+ temporary_buffer<char> ret(length);
+ length = 0;
+ for (auto& n : _fragments) {
+ std::memcpy(ret.share(length, n.size()).get_write(), n.get(), n.size());
+ length = n.size();
+ }
+ return _concatenated = std::move(ret);
+}
+
+
+inline void httpd::inbound_websocket_fragment::unmask(char *dst, const char *src, const char *mask, uint64_t length) {
+ for (uint64_t j = 0; j < _lenght; ++j) {
+ dst[j] = src[j] ^ mask[j % 4];
+ }
+/* for (unsigned int n = (length >> 2) + 1; n; n--) {
+ *(dst++) = *(src++) ^ mask[0];
+ *(dst++) = *(src++) ^ mask[1];
+ *(dst++) = *(src++) ^ mask[2];
+ *(dst++) = *(src++) ^ mask[3];
+ }*/
+}
diff --git a/http/websocket_fragment.hh b/http/websocket_fragment.hh
new file mode 100644
index 0000000..12ca000
--- /dev/null
+++ b/http/websocket_fragment.hh
@@ -0,0 +1,150 @@
+//
+// Created by hbarraud on 4/2/17.
+//
+
+#ifndef SEASTAR_WEBSOCKET_FRAGMENT_HH
+#define SEASTAR_WEBSOCKET_FRAGMENT_HH
+
+#include <core/reactor.hh>
+
+namespace httpd {
+
+ enum websocket_opcode : uint8_t {
+ CONTINUATION = 0x0,
+ TEXT = 0x1,
+ BINARY = 0x2,
+ CLOSE = 0x8,
+ PING = 0x9,
+ PONG = 0xA,
+ RESERVED = 0xB
+ };
+
+ class websocket_fragment_base {
+ protected:
+
+ bool _fin = false;
+ websocket_opcode _opcode = RESERVED;
+ uint64_t _lenght = 0;
+ bool _rsv2 = false;
+ bool _rsv3 = false;
+ bool _rsv1 = false;
+ bool _masked = false;
+ temporary_buffer<char> _maskkey;
+
+ public:
+ bool _is_empty = false;
+ temporary_buffer<char> message;
+ bool _is_valid = false;
+
+ websocket_fragment_base() : _is_empty(true), _is_valid(false) {}
+
+ websocket_fragment_base(websocket_fragment_base &&fragment) noexcept : _fin(fragment.fin()),
+ _opcode(fragment.opcode()),
+ _lenght(fragment.length()),
+ _rsv2(fragment.rsv2()),
+ _rsv3(fragment.rsv3()),
+ _rsv1(fragment.rsv1()),
+ _masked(fragment.masked()),
+ _maskkey(std::move(fragment._maskkey)),
+ _is_empty(fragment._is_empty),
+ message(std::move(fragment.message)),
+ _is_valid(fragment._is_valid) {
+ }
+
+ bool fin() { return _fin; }
+
+ websocket_opcode opcode() { return _opcode; }
+
+ uint64_t &length() { return _lenght; }
+
+ bool rsv2() { return _rsv2; }
+
+ bool rsv3() { return _rsv3; }
+
+ bool rsv1() { return _rsv1; }
+
+ bool masked() { return _masked; }
+
+ bool valid() {
+ return !((rsv1() || rsv2() || rsv3() || (opcode() > 2 && opcode() < 8) ||
+ opcode() > 10 || (opcode() > 2 && (!fin() || length() > 125))));
+ }
+
+ operator bool() { return !_is_empty && valid(); }
+ };
+
+ class inbound_websocket_fragment : public websocket_fragment_base {
+
+ public:
+
+ inbound_websocket_fragment(const inbound_websocket_fragment &) = delete;
+
+ inbound_websocket_fragment(inbound_websocket_fragment &&other) noexcept : websocket_fragment_base(std::move(other)) {
+ }
+
+ inbound_websocket_fragment(temporary_buffer<char> &raw, uint32_t *index);
+
+ inbound_websocket_fragment() : websocket_fragment_base() {}
+
+ private:
+ inline void unmask(char *dst, const char *src, const char *mask, uint64_t length);
+ };
+
+ class websocket_message {
+ public:
+ websocket_opcode opcode = RESERVED;
+ char _header[10];
+ size_t _header_size = 0;
+ std::vector<temporary_buffer<char>> _fragments;
+ temporary_buffer<char> _concatenated;
+
+ websocket_message() noexcept : _is_empty(true) { }
+ websocket_message(const websocket_message &) = delete;
+ websocket_message(websocket_message &&other) noexcept : opcode(other.opcode),
+ _fragments(std::move(other._fragments)),
+ _concatenated(std::move(other._concatenated)),
+ _is_empty(other._is_empty) {
+ }
+
+ void operator=(const websocket_message&) = delete;
+ websocket_message & operator= (websocket_message &&other) {
+ if (this != &other) {
+ opcode = other.opcode;
+ _fragments = std::move(other._fragments);
+ _concatenated = std::move(other._concatenated);
+ _is_empty = other._is_empty;
+ }
+ return *this;
+ }
+
+ websocket_message(std::unique_ptr<websocket_fragment_base> fragment) noexcept : _is_empty(false) {
+ _fragments.push_back(std::move(fragment->message));
+ opcode = fragment->opcode();
+ }
+
+ websocket_message(websocket_opcode kind, temporary_buffer<char> message) noexcept : _is_empty(false) {
+ _fragments.push_back(std::move(message));
+ opcode = kind;
+ }
+
+ websocket_message(websocket_opcode kind, sstring message) noexcept : _is_empty(false) {
+ _fragments.push_back(std::move(message).release());
+ opcode = kind;
+ }
+
+ void append(std::unique_ptr<websocket_fragment_base> fragment) {
+ _fragments.push_back(std::move(fragment->message));
+ }
+
+ void done();
+
+ temporary_buffer<char> & concat();
+
+ bool empty() { return _is_empty || opcode == CLOSE; }
+
+ private:
+ bool _is_empty;
+ };
+}
+
+#endif //SEASTAR_WEBSOCKET_FRAGMENT_HH
diff --git a/http/websocket_handler.hh b/http/websocket_handler.hh
new file mode 100644
index 0000000..91b8d5f
--- /dev/null
+++ b/http/websocket_handler.hh
@@ -0,0 +1,145 @@
+//
+// Created by hbarraud on 4/1/17.
+//
+
+#ifndef SEASTAR_WEBSOCKET_HANDLER_HH
+#define SEASTAR_WEBSOCKET_HANDLER_HH
+
+#include "handlers.hh"
+
+namespace httpd {
+
+typedef std::function<future<>(const httpd::request& req, connected_websocket ws)> future_ws_handler_function;
+
+class websocket_function_handler : public httpd::handler_websocket_base {
+
+public:
+ websocket_function_handler(const future_ws_handler_function & f_handle)
+ : _f_handle(f_handle) {
+ }
+
+ future<> handle(const sstring &path, connected_websocket ws) override {
+ return _f_handle(ws._request, std::move(ws));
+ }
+
+protected:
+ future_ws_handler_function _f_handle;
+};
+
+typedef std::function<future<>(const httpd::request&, websocket_output_stream* ws)> future_ws_on_dis_connected;
+typedef std::function<future<>(const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message)> future_ws_on_message;
+
+typedef std::function<void(const httpd::request&, websocket_output_stream* ws)> void_ws_on_dis_connected;
+typedef std::function<void(const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message)> void_ws_on_message;
+
+class websocket_handler : public httpd::handler_websocket_base {
+
+public:
+ websocket_handler() : _on_connection([] (const httpd::request&, websocket_output_stream* ws) { return make_ready_future(); }),
+ _on_message([] (const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) { return make_ready_future(); }),
+ _on_disconnection([] (const httpd::request&, websocket_output_stream* ws) { return make_ready_future(); }),
+ _on_pong([] (const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) { return make_ready_future(); }),
+ _on_ping([] (const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) {
+ message->opcode = websocket_opcode::PONG;
+ return ws->write(std::move(message));
+ }) {}
+
+ future<> handle(const sstring &path, connected_websocket ws) override {
+ auto input = ws.input();
+ auto output = ws.output();
+ return do_with(std::move(ws), std::move(input), std::move(output), [this] (connected_websocket &ws,
+ websocket_input_stream &input,
+ websocket_output_stream &output) {
+ return _on_connection(ws._request, &output).then([this, &ws, &input, &output] {
+ return repeat([this, &input, &output, &ws] {
+ return input.read().then([this, &output, &ws](std::unique_ptr<httpd::websocket_message> buf) {
+ if (!buf)
+ return make_ready_future<bool_class<stop_iteration_tag>>(stop_iteration::yes);
+ return on_message_internal(ws._request, &output, std::move(buf)).then([] (bool close) {
+ return bool_class<stop_iteration_tag>(close);
+ });
+ });
+ });
+ });
+ }).then([this, &ws, &output] { return _on_disconnection(ws._request, &output); });
+ }
+
+ void on_message(const void_ws_on_message & handler) {
+ _on_message = [handler](const httpd::request& req, websocket_output_stream *ws,
+ std::unique_ptr<httpd::websocket_message> message) {
+ handler(req, ws, std::move(message));
+ return make_ready_future();
+ };
+ }
+ void on_ping(const void_ws_on_message & handler) {
+ _on_ping = [handler](const httpd::request& req, websocket_output_stream *ws,
+ std::unique_ptr<httpd::websocket_message> message) {
+ handler(req, ws, std::move(message));
+ return make_ready_future();
+ };
+ }
+ void on_pong(const void_ws_on_message & handler) {
+ _on_pong = [handler](const httpd::request& req, websocket_output_stream *ws,
+ std::unique_ptr<httpd::websocket_message> message) {
+ handler(req, ws, std::move(message));
+ return make_ready_future();
+ };
+ }
+ void on_connection(const void_ws_on_dis_connected & handler) {
+ _on_connection = [handler](const httpd::request& req, websocket_output_stream *ws) {
+ handler(req, ws);
+ return make_ready_future();
+ };
+ }
+ void on_disconnection(const void_ws_on_dis_connected & handler) {
+ _on_disconnection = [handler](const httpd::request& req, websocket_output_stream *ws) {
+ handler(req, ws);
+ return make_ready_future();
+ };
+ }
+
+ void on_message_future(const future_ws_on_message & handler) { _on_message = handler; }
+ void on_ping_future(const future_ws_on_message & handler) { _on_ping = handler; }
+ void on_pong_future(const future_ws_on_message & handler) { _on_pong = handler; }
+ void on_connection_future(const future_ws_on_dis_connected & handler) { _on_connection = handler; }
+ void on_disconnection_future(const future_ws_on_dis_connected & handler) { _on_disconnection = handler; }
+
+private:
+
+ future<bool> on_message_internal(const httpd::request& req, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) {
+ switch (message->opcode){
+ case TEXT:
+ //FIXME Check that buffer is valid UTF-8
+ case BINARY:
+ return _on_message(req, ws, std::move(message)).then([] {
+ return false;
+ });
+ case PING:
+ return _on_ping(req, ws, std::move(message)).then([] {
+ return false;
+ });
+ case PONG:
+ return _on_pong(req, ws, std::move(message)).then([] {
+ return false;
+ });
+ case CLOSE:
+ case RESERVED:
+ default:
+ return make_ready_future().then([] {
+ return true;
+ });
+ }
+ }
+
+protected:
+ future_ws_on_dis_connected _on_connection;
+ future_ws_on_message _on_message;
+ future_ws_on_dis_connected _on_disconnection;
+ future_ws_on_message _on_pong;
+ future_ws_on_message _on_ping;
+};
+
+}
+
+
+#endif //SEASTAR_WEBSOCKET_HANDLER_HH