A Websocket odyssey

511 views
Skip to first unread message

Hippolyte Barraud

<hippolyte.barraud@gmail.com>
unread,
Apr 9, 2017, 7:52:09 PM4/9/17
to seastar-dev

I discovered seastar some weeks ago. I found it really cool and decided to start a pet/week-end project with it and I thought that implementing WebSockets in the httpd module would be a good starting point and a useful thing to have in seastar.

My naive implementation works pretty well, but I'm facing counter-intuitive performance results.

Notable additions can be found in :
I'm building seastar and the httpd app using 

./configure.py --enable-gcc6-concepts --disable-xen

To benchmark my implementation I'm using the throughput benchmark from the uWebSocket project.

All results come from a Fedora 25 VM with 6vcores allocated :
[hbarraud@fedora-linux seastar]$ gcc --version
gcc (GCC) 6.3.1 20161221 (Red Hat 6.3.1-1)
Copyright (C) 2016 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

[hbarraud@fedora-linux seastar]$ uname -a
Linux fedora-linux 4.8.6-300.fc25.x86_64 #1 SMP Tue Nov 1 12:36:38 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

#Test 1 : Echo of 1 client sending frames of 20 bytes as fast as possible. 

##Seastar :

Server launch command :
./build/release/apps/httpd/httpd

Client launch command :
./throughput 1 20 1 10000

Results :
[...]
Echo performance: 9.51331 echoes/ms

##uWebSocket :

Server launch command :
./uWS_epoll

Client launch command :
./throughput 1 20 1 3000

Results :
[...]
Echo performance: 6.72122 echoes/ms

In this very simple test, the seastar server has the hedge.

#Test 2 : Echo of 500 clients sending frames of 20 bytes as fast as possible.

##Seastar :

Server launch command :
./build/release/apps/httpd/httpd

Client launch command :
./throughput 500 20 1 port

Results :
[...]
Echo performance: 54.0657 echoes/ms

##uWebSocket :

Server launch command :
./uWS_epoll

Client launch command :
./throughput 500 20 1 port

Results :
[...]
Echo performance: 112.717 echoes/ms

When more clients are involved, my server gets bad performance (sometimes really bad).
What's even more weird is that limiting the number of cores the server uses (using -cactually enhances the results to a point where it almost matches the uWebSocket server :

 

All cpus are loaded at about 70% during the -c6 test :


I tried to understand what's going on. Searched for unnecessary memory copy, bad implementation, but found nothing that explains those results. I don't share any memory between threads and they don't communicate anyhow.

I'm by no mean a C++ guru, so any advices & critics are welcome !

Dor Laor

<dor@scylladb.com>
unread,
Apr 10, 2017, 2:05:31 AM4/10/17
to Hippolyte Barraud, seastar-dev
Cheers for coding around Seastar.
Without diving to any detail, I can give a general rule-of-the-thumb advices:
1. Understand the bottleneck.
    Not seeing 100% cpu per core is a big no-no for Seastar.
    You want all the cores to be loaded, if it's not the case, either 
    there is a bottleneck somewhere else or things aren't fully async.
    Start with a single core, add cores and see when the cpu isn't 100% loaded per core.
2. Use Prometheus metrics
    There are tons of good metrics that reveal what's going on
3. Perf top and other perf tools
    They'll help you to figure out where the cpu time goes


--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev+unsubscribe@googlegroups.com.
To post to this group, send email to seast...@googlegroups.com.
Visit this group at https://groups.google.com/group/seastar-dev.
To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/02e48d93-febd-4ab4-8978-ac6c55af7811%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Avi Kivity

<avi@scylladb.com>
unread,
Apr 11, 2017, 8:25:24 AM4/11/17
to Hippolyte Barraud, seastar-dev

I think WebSockets would make a great addition to seastar, as they fit the asynchronous model well.  I hope you'll consider contributing the code.


I did not look at the code yet (I prefer to see patches posted to the mailing list); but are you loading the server with enough connections?  You'll want a significant number so that they are distributed more or less evenly.

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.

Hippolyte Barraud

<hippolyte.barraud@gmail.com>
unread,
Apr 13, 2017, 5:29:18 PM4/13/17
to seastar-dev
I profiled my websocket implementation via callgrind, but wasn't able to find anything promising. It's weird me because, when running with -c1, the cpu is running at 100% solid. But when adding cores, load is decreasing.
With 500 concurrent connections sending messages, a 24 core instance has the same throughput as when running on a single core. Which is a shame really.

Here is the patch :

diff --git a/apps/httpd/main.cc b/apps/httpd/main.cc
index f112a6e..999c4f6 100644
--- a/apps/httpd/main.cc
+++ b/apps/httpd/main.cc
@@ -21,10 +21,12 @@
 
 #include "http/httpd.hh"
 #include "http/handlers.hh"
+#include "http/websocket_handler.hh"
 #include "http/function_handlers.hh"
 #include "http/file_handler.hh"
 #include "apps/httpd/demo.json.hh"
 #include "http/api_docs.hh"
+#include "core/prometheus.hh"
 
 namespace bpo = boost::program_options;
 
@@ -47,10 +49,45 @@ void set_routes(routes& r) {
     function_handler* h2 = new function_handler([](std::unique_ptr<request> req) {
         return make_ready_future<json::json_return_type>("json-future");
     });
+
+    websocket_function_handler* ws1 = new websocket_function_handler([](const httpd::request& req, connected_websocket ws) {
+        auto input = ws.input();
+        auto output = ws.output();
+        return do_with(std::move(input), std::move(output), [] (websocket_input_stream &input,
+                                                                websocket_output_stream &output) {
+            return repeat([&input, &output] {
+                return input.read().then([&output](std::unique_ptr<httpd::websocket_message> buf){
+                    if (!buf)
+                        return make_ready_future<bool_class<stop_iteration_tag>>(stop_iteration::yes);
+                    return output.write(std::move(buf)).then([] {
+                        return stop_iteration::no;
+                    });
+                });
+            });
+        });
+    });
+
+    auto ws_managed_handler = new websocket_handler();
+
+    ws_managed_handler->on_connection_future([] (const httpd::request& req, websocket_output_stream* ws) {
+        temporary_buffer<char> test("Hello from seastar !", 20);
+        return ws->write(websocket_opcode::TEXT, std::move(test));
+    });
+
+    ws_managed_handler->on_message_future([] (const httpd::request& req, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) {
+        return ws->write(std::move(message));
+    });
+
+    ws_managed_handler->on_disconnection([] (const httpd::request& req, websocket_output_stream* ws) {
+
+    });
+
     r.add(operation_type::GET, url("/"), h1);
     r.add(operation_type::GET, url("/jf"), h2);
-    r.add(operation_type::GET, url("/file").remainder("path"),
-            new directory_handler("/"));
+    r.add(operation_type::GET, url("/file").remainder("path"), new directory_handler("/"));
+    r.put("/managed", ws_managed_handler);
+    r.put("/", ws1);
+
     demo_json::hello_world.set(r, [] (const_req req) {
         demo_json::my_object obj;
         obj.var1 = req.param.at("var1");
@@ -65,18 +102,18 @@ void set_routes(routes& r) {
 int main(int ac, char** av) {
     app_template app;
     app.add_options()("port", bpo::value<uint16_t>()->default_value(10000),
-            "HTTP Server port");
+                      "HTTP Server port");
     return app.run_deprecated(ac, av, [&] {
-        auto&& config = app.configuration();
+        auto &&config = app.configuration();
         uint16_t port = config["port"].as<uint16_t>();
         auto server = new http_server_control();
         auto rb = make_shared<api_registry_builder>("apps/httpd/");
         server->start().then([server] {
             return server->set_routes(set_routes);
-        }).then([server, rb]{
-            return server->set_routes([rb](routes& r){rb->set_api_doc(r);});
-        }).then([server, rb]{
-            return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");});
+        }).then([server, rb] {
+            return server->set_routes([rb](routes &r) { rb->set_api_doc(r); });
+        }).then([server, rb] {
+            return server->set_routes([rb](routes &r) { rb->register_function(r, "demo", "hello world application"); });
         }).then([server, port] {
             return server->listen(port);
         }).then([server, port] {
@@ -85,6 +122,6 @@ int main(int ac, char** av) {
                 return server->stop();
             });
         });
-
     });
 }
+
diff --git a/configure.py b/configure.py
index a9acc3c..a1cc277 100755
--- a/configure.py
+++ b/configure.py
@@ -328,7 +328,8 @@ http = ['http/transformers.cc',
         'http/reply.cc',
         'http/request_parser.rl',
         'http/api_docs.cc',
-        'http/websocket.cc'
+        'http/websocket.cc',
+        'http/websocket_fragment.cc'
         ]
 
 boost_test_lib = [
diff --git a/core/reactor.hh b/core/reactor.hh
index d48ca04..10bbf65 100644
--- a/core/reactor.hh
+++ b/core/reactor.hh
@@ -68,6 +68,7 @@
 #include "core/enum.hh"
 #include "core/memory.hh"
 #include <boost/range/irange.hpp>
+#include <http/websocket_fragment.hh>
 #include "timer.hh"
 #include "condition-variable.hh"
 #include "util/log.hh"
diff --git a/http/handlers.hh b/http/handlers.hh
index 9a9888a..54c0457 100644
--- a/http/handlers.hh
+++ b/http/handlers.hh
@@ -26,8 +26,10 @@
 #include "common.hh"
 #include "reply.hh"
 #include "core/future-util.hh"
+#include "websocket.hh"
 
 #include <unordered_map>
+#include <net/api.hh>
 
 namespace httpd {
 
@@ -68,6 +70,39 @@ class handler_base {
 
 };
 
+/**
+ * handlers holds the logic for serving an incoming request.
+ * All handlers inherit from the base handler_websocket_base and
+ * implement the handle method.
+ */
+class handler_websocket_base {
+public:
+    /**
+     * All handlers should implement this method.
+     *  It fill the reply according to the request.
+     * @param path the url path used in this call
+     * @param params optional parameter object
+     * @param req the original request
+     * @param rep the reply
+     */
+    virtual future<> handle(const sstring& path, connected_websocket ws) = 0;
+
+    virtual ~handler_websocket_base() = default;
+
+    /**
+     * Add a mandatory parameter
+     * @param param a parameter name
+     * @return a reference to the handler
+     */
+    handler_websocket_base& mandatory(const sstring& param) {
+        _mandatory_param.push_back(param);
+        return *this;
+    }
+
+    std::vector<sstring> _mandatory_param;
+
+};
+
 }
 
 #endif /* HANDLERS_HH_ */
diff --git a/http/httpd.hh b/http/httpd.hh
index 4ded1d7..036b89c 100644
--- a/http/httpd.hh
+++ b/http/httpd.hh
@@ -44,12 +44,15 @@
 #include <boost/intrusive/list.hpp>
 #include "reply.hh"
 #include "http/routes.hh"
+#include "http/websocket.hh"
 
 #include <cryptopp/sha.h>
 #include <cryptopp/filters.h>
 #include <cryptopp/hex.h>
 #include <cryptopp/base64.h>
+
 #include "core/sleep.hh"
+#include <chrono>
 
 namespace httpd {
 
@@ -149,13 +152,14 @@ namespace httpd {
             http_request_parser _parser;
             std::unique_ptr<request> _req;
             std::unique_ptr<reply> _resp;
+            socket_address _addr;
             // null element marks eof
             queue<std::unique_ptr<reply>> _replies { 10 };bool _done = false;
         public:
             connection(http_server& server, connected_socket&& fd,
                        socket_address addr)
                     : _server(server), _fd(std::move(fd)), _read_buf(_fd.input()), _write_buf(
-                    _fd.output()) {
+                    _fd.output()), _addr(addr) {
                 ++_server._total_connections;
                 ++_server._current_connections;
                 _server._connections.push_back(*this);
@@ -365,7 +369,7 @@ namespace httpd {
                     } else if (it->second.find("Upgrade") != std::string::npos) {
                         auto upgrade = req->_headers.find("Upgrade");
                         if (upgrade != req->_headers.end() && upgrade->second == "websocket")
-                            return upgrade_websocket(std::move(req)); //websocket upgrade
+                            return upgrade_websocket(std::move(req)).then([] { return true; }); //websocket upgrade
                     }
                 }
                 bool should_close;
@@ -386,24 +390,23 @@ namespace httpd {
                 sstring url = set_query_param(*req.get());
                 sstring version = req->_version;
 
-                return _server._routes.handle(url, std::move(req), std::move(resp)).
-                        // Caller guarantees enough room
-                        then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
+                return _server._routes.handle(url, std::move(req), std::move(resp)).then([this, should_close, version = std::move(version)](std::unique_ptr<reply> rep) {
                     rep->set_version(version).done();
                     this->_replies.push(std::move(rep));
                     return make_ready_future<bool>(should_close);
                 });
             }
 
-            future<bool> upgrade_websocket(std::unique_ptr<request> req) {
+            future<> upgrade_websocket(std::unique_ptr<request> req) {
                 constexpr char websocket_uuid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
                 constexpr size_t websocket_uuid_len = 36;
 
+                sstring url = set_query_param(*req.get());
                 auto resp = std::make_unique<reply>();
                 resp->set_version(req->_version);
 
                 auto it = req->_headers.find("Sec-WebSocket-Key");
-                if (it != req->_headers.end()) {
+                if (it != req->_headers.end() && _server._routes.get_ws_handler(url, *req.get())) {
                     //Success
                     resp->_headers["Upgrade"] = "websocket";
                     resp->_headers["Connection"] = "Upgrade";
@@ -429,22 +432,15 @@ namespace httpd {
                     resp->set_status(reply::status_type::switching_protocols).done();
 
                     _replies.push(std::move(resp));
-                    return repeat([this] {
-                        return _read_buf.read().then([this](auto buf){
-                            std::cout << "read from connection : ";
-                            std::cout.write(buf.begin(), buf.size());
-                            std::cout << std::endl;
-                            for (std::size_t i = 0; i < buf.size(); ++i)
-                                std::cout << std::bitset<8>(buf.begin()[i]) << std::endl;
-                            if (!buf)
-                                return make_ready_future<bool_class<stop_iteration_tag> >(stop_iteration::yes);
-                            return _write_buf.write(std::move(buf)).then([this] { return _write_buf.flush(); }).then([] {
-                                return stop_iteration::no;
-                            });
-                        });
-                    }).then([] {
-                        std::cout << "closing connection" << std::endl;
-                        return true;
+                    return do_until([this] {
+                        return _replies.empty();
+                    }, [this] {
+                        //If we don't wait, the HTTP response might not be flushed before user-code start sending messages
+                        //resulting in malformed handcheck response.
+                        return sleep(std::chrono::milliseconds(30)); //FIXME this is awful
+                    }).then([this, req = std::move(req), url] {
+                        auto ws = connected_websocket(&_fd, _addr, *req.get());
+                        return _server._routes.handle_ws(url, std::move(ws));
                     });
                 }
                 else {
@@ -452,8 +448,7 @@ namespace httpd {
                     resp->set_status(reply::status_type::bad_request);
                 }
                 resp->done();
-                this->_replies.push(std::move(resp));
-                return make_ready_future<bool>(true);
+                return this->_replies.push_eventually(std::move(resp));
             }
 
             future<> write_body() {
@@ -507,9 +502,8 @@ namespace httpd {
         static sstring generate_server_name();
     public:
         http_server_control() : _server_dist(new distributed<http_server>) {
-        }
-
 
+        }
         future<> start(const sstring& name = generate_server_name()) {
             return _server_dist->start(name);
         }
diff --git a/http/routes.cc b/http/routes.cc
index 18d973b..54938b5 100644
--- a/http/routes.cc
+++ b/http/routes.cc
@@ -106,6 +106,17 @@ future<std::unique_ptr<reply> > routes::handle(const sstring& path, std::unique_
     return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
 }
 
+future<> routes::handle_ws(const sstring &path, connected_websocket ws) {
+    handler_websocket_base* handler = get_ws_handler(normalize_url(path), ws._request);
+    if (handler != nullptr) {
+        for (auto& i : handler->_mandatory_param) {
+            verify_param(ws._request, i);
+        }
+        return handler->handle(path, std::move(ws));
+    }
+    return make_ready_future();
+}
+
 sstring routes::normalize_url(const sstring& url) {
     if (url.length() < 2 || url.at(url.length() - 1) != '/') {
         return url;
@@ -131,6 +142,22 @@ handler_base* routes::get_handler(operation_type type, const sstring& url,
     return nullptr;
 }
 
+handler_websocket_base *routes::get_ws_handler(const sstring &url, const httpd::request& req) {
+    handler_websocket_base* handler = (_map_ws.find(url) == _map_ws.end()) ? nullptr : _map_ws[url];
+    if (handler != nullptr) {
+        try {
+            for (auto& i : handler->_mandatory_param) {
+                verify_param(req, i);
+            }
+        }
+        catch (const missing_param_exception &e) {
+            return nullptr;
+        }
+        return handler;
+    }
+    return nullptr;
+}
+
 routes& routes::add(operation_type type, const url& url,
         handler_base* handler) {
     match_rule* rule = new match_rule(handler);
diff --git a/http/routes.hh b/http/routes.hh
index 187b7e9..ccc6284 100644
--- a/http/routes.hh
+++ b/http/routes.hh
@@ -31,6 +31,7 @@
 #include <unordered_map>
 #include <vector>
 #include "core/future-util.hh"
+#include "websocket.hh"
 
 namespace httpd {
 
@@ -95,6 +96,19 @@ class routes {
     }
 
     /**
+     * adding a handler as an exact match
+     * @param url the url to match (note that url should start with /)
+     * @param handler the desire handler
+     * @return it self
+     */
+    routes& put(const sstring& url, handler_websocket_base* handler) {
+        //FIXME if a handler is already exists, it need to be
+        // deleted to prevent memory leak
+        _map_ws[url] = handler;
+        return *this;
+    }
+
+    /**
      * add a rule to be used.
      * rules are search only if an exact match was not found.
      * rules are search by the order they were added.
@@ -130,6 +144,28 @@ class routes {
      */
     future<std::unique_ptr<reply> > handle(const sstring& path, std::unique_ptr<request> req, std::unique_ptr<reply> rep);
 
+
+    /**
+     * the main entry point.
+     * the general handler calls this method with the request
+     * the method takes the headers from the request and find the
+     * right handler.
+     * It then call the handler with the parameters (if they exists) found in the url
+     * @param path the url path found
+     * @param req the http request
+     * @param rep the http reply
+     */
+    future<> handle_ws(const sstring& path, connected_websocket ws);
+
+    /**
+     * Search and return a handler by the operation type and url
+     * @param type the http operation type
+     * @param url the request url
+     * @param params a parameter object that will be filled during the match
+     * @return a handler based on the type/url match
+     */
+    handler_websocket_base* get_ws_handler(const sstring& url, const httpd::request& req);
+
     /**
      * Search and return an exact match
      * @param url the request url
@@ -163,6 +199,10 @@ class routes {
 
     std::unordered_map<sstring, handler_base*> _map[NUM_OPERATION];
     std::vector<match_rule*> _rules[NUM_OPERATION];
+
+    //Websocket
+    std::unordered_map<sstring, handler_websocket_base*> _map_ws;
+
 public:
     using exception_handler_fun = std::function<std::unique_ptr<reply>(std::exception_ptr eptr)>;
     using exception_handler_id = size_t;
diff --git a/http/websocket.cc b/http/websocket.cc
index 346f560..8b8a23a 100644
--- a/http/websocket.cc
+++ b/http/websocket.cc
@@ -4,68 +4,82 @@
 
 #include "websocket.hh"
 
-server_websocket::server_websocket(socket_address sa, listen_options opts) : _sa(sa), _opts(opts) {}
-
-void server_websocket::listen() {
-    _server_socket = engine().listen(_sa, _opts);
+httpd::connected_websocket::connected_websocket(connected_socket *socket, socket_address &remote_adress,
+                                                request &request) noexcept : _socket(std::move(socket)),
+                                                                             remote_adress(remote_adress),
+                                                                             _request(request) {
 }
 
-future<connected_websocket> server_websocket::accept() {
-    return _server_socket.accept().then([] (connected_socket sock, socket_address addr) {
-        return connected_websocket(std::move(sock), addr);
-    });
+httpd::connected_websocket::connected_websocket(httpd::connected_websocket &&cs) noexcept : _socket(std::move(cs._socket)),
+                                                                                            remote_adress(cs.remote_adress),
+                                                                                            _request(std::move(cs._request)) {
 }
 
-connected_websocket::connected_websocket(connected_socket &&socket,
-                                         socket_address &remote_adress) noexcept : _socket(std::move(socket)), remote_adress(remote_adress) {}
-
-connected_websocket::connected_websocket(connected_websocket &&cs) noexcept : _socket(std::move(cs._socket)), remote_adress(cs.remote_adress) {
-
-}
-
-connected_websocket &connected_websocket::operator=(connected_websocket &&cs) noexcept {
+httpd::connected_websocket &httpd::connected_websocket::operator=(httpd::connected_websocket &&cs) noexcept {
     _socket = std::move(cs._socket);
     remote_adress = std::move(cs.remote_adress);
     return *this;
 }
 
-future<websocket_fragment> websocket_input_stream::readFragment() {
-    return _stream.read().then([] (temporary_buffer<char> buf) {
-        websocket_fragment fragment(std::move(buf));
-        return fragment;
-    });
-}
-
-future<temporary_buffer<char>> websocket_input_stream::read() {
-    _buf.clear();
-    return repeat([this] {
+future<> httpd::websocket_input_stream::read_fragment() {
+    auto parse_fragment = [this] {
+        if (_buf.size() - _index > 2)
+            _fragment = std::move(std::make_unique<inbound_websocket_fragment>(_buf, &_index));
+    };
 
-        return readFragment().then([this] (auto fragment) {
-            if (fragment.data){
-                _buf.append(fragment.data.get(), fragment.data.size());
-                if (fragment.fin())
-                    return stop_iteration::yes;
-                return stop_iteration::no;
-            }
-            return stop_iteration::yes;
+    _fragment = nullptr;
+    if (!_buf || _index >= _buf.size())
+        return _stream.read().then([this, parse_fragment](temporary_buffer<char> buf) {
+            _buf = std::move(buf);
+            _index = 0;
+            parse_fragment();
         });
+    parse_fragment();
+    return make_ready_future();
+}
 
-/*        return _stream.read().then([this] (temporary_buffer<char> buf) {
-            if (buf) {
-                _buf.append(buf.get(), buf.size());
-                if (buf.get()[0] == 'h')
-                    return stop_iteration::yes;
-                return stop_iteration::no;
-            } else {
+future<std::unique_ptr<httpd::websocket_message>> httpd::websocket_input_stream::read() {
+    _lastmassage = nullptr;
+    return repeat([this] { // gather all fragments and concatenate full message
+        return read_fragment().then([this] {
+            if (!_fragment || _fragment->_is_empty)
+                return stop_iteration::yes;
+            else if (_fragment->fin()) {
+                if (!_lastmassage)
+                    _lastmassage = std::move(std::make_unique<websocket_message>(std::move(_fragment)));
+                else
+                    _lastmassage->append(std::move(_fragment));
                 return stop_iteration::yes;
             }
+            else if (_fragment->opcode() == CONTINUATION)
+                _lastmassage->append(std::move(_fragment));
+            return stop_iteration::no;
         });
-*/
+    }).then([this] {
+        return std::move(_lastmassage);
+    });
+}
 
+/*
+ * When the write is called and if (!_buf || _index >= _buf.size()) == false, it would make sense
+ * to buff it and flush everything at once before the next read().
+ */
+future<> httpd::websocket_output_stream::write(std::unique_ptr<httpd::websocket_message> message) {
+    message->done();
+    return do_with(std::move(message), [this] (std::unique_ptr<httpd::websocket_message> &frag) {
+        temporary_buffer<char> head((char *)&frag->_header, frag->_header_size);
+        return this->_stream.write(std::move(head)).then([this, &frag] {
+            return do_for_each(frag->_fragments, [this] (temporary_buffer<char> &buff) {
+                return this->_stream.write(std::move(buff));
+            });
+        });
     }).then([this] {
-        if (_buf.empty())
-            return make_ready_future<temporary_buffer<char>>();
-        std::cout<<"size is " << _buf.size() << std::endl;
-        return make_ready_future<temporary_buffer<char>>(std::move(temporary_buffer<char>(_buf.c_str(), _buf.size())));
+        return this->_stream.flush();
+    }).handle_exception([this] (std::exception_ptr e) {
+        return _stream.close();
     });
+}
+
+future<> httpd::websocket_output_stream::write(websocket_opcode kind, temporary_buffer<char> buf) {
+    return write(std::move(std::make_unique<websocket_message>(kind, std::move(buf))));
 }
\ No newline at end of file
diff --git a/http/websocket.hh b/http/websocket.hh
index 65e134e..ba0ea75 100644
--- a/http/websocket.hh
+++ b/http/websocket.hh
@@ -6,97 +6,27 @@
 #define SEASTARPLAYGROUND_WEBSOCKET_HPP
 
 #include <core/reactor.hh>
-#include "core/scattered_message.hh"
 #include <net/socket_defs.hh>
+#include "websocket_fragment.hh"
+#include "request.hh"
 
-class websocket_fragment {
-    temporary_buffer<char> _raw;
-    uint16_t _header;
-
-    bool _fin;
-    unsigned char _opcode;
-    uint64_t _lenght;
-    bool _rsv23;
-    bool _rsv1;
-    bool _masked;
-
-    uint32_t _maskkey;
-
-public:
-
-    temporary_buffer<char> data;
-
-    websocket_fragment(temporary_buffer<char> &&raw) : _raw(std::move(raw)), _header(0) {
-        uint64_t i = sizeof(uint16_t);
-        std::memcpy(&_header, _raw.begin(), sizeof(uint16_t));
-        _fin = _header & 128;
-        _opcode = _header & 15;
-        _rsv23 = _header & 48;
-        _rsv1 = _header & 64;
-        _masked = _header & 32768;
-        _lenght = (_header >> 8) & 127;
-        if (_lenght == 126)
-        {
-            _lenght = *((uint16_t *) _raw.share(i, sizeof(uint16_t)).get());
-            i += sizeof(uint16_t);
-        }
-        else if (_lenght == 127)
-        {
-            _lenght = *((uint64_t *) _raw.share(i, sizeof(uint64_t)).get());
-            i += sizeof(uint64_t);
-        }
-
-        if (_masked)
-        {
-            _maskkey = *((uint16_t *) _raw.share(i, sizeof(uint32_t)).get());
-            i += sizeof(uint32_t);
-        }
-
-        data = _raw.share(i, _lenght);
-    }
-
-public:
-    bool fin() { return _fin; }
-    unsigned char opcode() { return _opcode; }
-    uint64_t &length() { return _lenght; }
-    bool rsv23() { return _rsv23; }
-    bool rsv1() { return _rsv1; }
-    bool masked() { return _masked; }
-
-    bool valid() {
-        return !((rsv1() /*&& !setCompressed(user)*/) || rsv23() || (opcode() > 2 && opcode() < 8) ||
-                 opcode() > 10 || (opcode() > 2 && (!fin() || length() > 125)));
-    }
-};
+namespace httpd {
 
 class websocket_output_stream final {
     output_stream<char> _stream;
     temporary_buffer<char> _buf;
     size_t _size = 0;
-    size_t _begin = 0;
-    size_t _end = 0;
-    bool _trim_to_size = false;
-    bool _batch_flushes = false;
-    std::experimental::optional<promise<>> _in_batch;
-    bool _flush = false;
-    bool _flushing = false;
-    std::exception_ptr _ex;
 public:
     websocket_output_stream() = default;
+
     websocket_output_stream(output_stream<char> stream) : _stream(std::move(stream)) {}
-    websocket_output_stream(websocket_output_stream&&) = default;
-    websocket_output_stream& operator=(websocket_output_stream&&) = default;
 
-    //future<> write(const char* buf, size_t n);
-    //future<> write(const char* buf);
+    websocket_output_stream(websocket_output_stream &&) = default;
 
-    //future<> write(const basic_sstring<StringChar, SizeType, MaxSize>& s);
-    //future<> write(const std::basic_string<char>& s);
+    websocket_output_stream &operator=(websocket_output_stream &&) = default;
 
-    //future<> write(net::packet p);
-    //future<> write(scattered_message<char> msg);
-    //future<> write(temporary_buffer<char_type>);
-    //future<> flush();
+    future<> write(std::unique_ptr<httpd::websocket_message> message);
+    future<> write(websocket_opcode kind, temporary_buffer<char>);
     future<> close() { return _stream.close(); };
 private:
     friend class reactor;
@@ -104,61 +34,51 @@ class websocket_output_stream final {
 
 class websocket_input_stream final {
     input_stream<char> _stream;
-    std::string _buf = "";
-    bool _eof = false;
+    std::unique_ptr<inbound_websocket_fragment> _fragment;
+    std::unique_ptr<websocket_message> _lastmassage;
+    temporary_buffer<char> _buf;
+    uint32_t _index = 0;
+
 private:
-    using tmp_buf = temporary_buffer<char>;
-    size_t available() const { return _buf.size(); }
-protected:
-    void reset() { _buf = {}; }
+    future<> parse_fragment();
 public:
     websocket_input_stream() = default;
-    explicit websocket_input_stream(input_stream<char> stream) : _stream(std::move(stream)), _buf("") {}
-    websocket_input_stream(websocket_input_stream&&) = default;
-    websocket_input_stream& operator=(websocket_input_stream&&) = default;
 
-    bool eof() { return _eof; }
+    explicit websocket_input_stream(input_stream<char> stream) : _stream(std::move(stream)) {}
 
-    future<websocket_fragment> readFragment();
+    websocket_input_stream(websocket_input_stream &&) = default;
 
-    future<temporary_buffer<char>> read();
-    future<> close() { return _stream.close(); }
+    websocket_input_stream &operator=(websocket_input_stream &&) = default;
 
-    /// Ignores n next bytes from the stream.
-    //future<> skip(uint64_t n);
+    future<std::unique_ptr<httpd::websocket_message>> read();
+
+    future<> read_fragment();
+
+    future<> close() { return _stream.close(); }
 };
 
 class connected_websocket {
 private:
-    connected_socket _socket;
+    connected_socket *_socket;
+
 public:
     socket_address remote_adress;
-    connected_websocket(connected_socket &&_socket, socket_address &remote_adress) noexcept;
+    request _request;
 
-    connected_websocket(connected_websocket&& cs) noexcept;
-    connected_websocket& operator=(connected_websocket&& cs) noexcept;
+    connected_websocket(connected_socket *socket, socket_address &remote_adress, request &request) noexcept;
+
+    connected_websocket(connected_websocket &&cs) noexcept;
+
+    connected_websocket &operator=(connected_websocket &&cs) noexcept;
 
     websocket_input_stream input() {
-        return websocket_input_stream(std::move(_socket.input()));
+        return websocket_input_stream(std::move(_socket->input()));
     }
 
     websocket_output_stream output() {
-        return websocket_output_stream(std::move(_socket.output()));
+        return websocket_output_stream(std::move(_socket->output()));
     }
 };
-
-class server_websocket {
-
-private:
-    socket_address _sa;
-    listen_options _opts;
-    server_socket _server_socket;
-
-public:
-    server_websocket(socket_address sa, listen_options opts = {});
-
-    void listen();
-    future<connected_websocket> accept();
-};
+}
 
 #endif //SEASTARPLAYGROUND_WEBSOCKET_HPP
\ No newline at end of file
diff --git a/http/websocket_fragment.cc b/http/websocket_fragment.cc
new file mode 100644
index 0000000..7291969
--- /dev/null
+++ b/http/websocket_fragment.cc
@@ -0,0 +1,119 @@
+//
+// Created by hbarraud on 4/2/17.
+//
+
+#include "websocket_fragment.hh"
+
+/*    0                   1                   2                   3
+0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-------+-+-------------+-------------------------------+
+|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
+|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
+|N|V|V|V|       |S|             |   (if payload len==126/127)   |
+| |1|2|3|       |K|             |                               |
++-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+|     Extended payload length continued, if payload len == 127  |
++ - - - - - - - - - - - - - - - +-------------------------------+
+|                               |Masking-key, if MASK set to 1  |
++-------------------------------+-------------------------------+
+| Masking-key (continued)       |          Payload Data         |
++-------------------------------- - - - - - - - - - - - - - - - +
+:                     Payload Data continued ...                :
++ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+|                     Payload Data continued ...                |
++---------------------------------------------------------------+
+*/
+httpd::inbound_websocket_fragment::inbound_websocket_fragment(temporary_buffer<char> &raw, uint32_t *i) : websocket_fragment_base() {
+    auto buf = raw.get_write();
+
+    //First header byte
+    _fin = buf[*i] & 128;
+    _rsv1 = buf[*i] & 64;
+    _opcode = static_cast<websocket_opcode>(buf[*i] & 15);
+    *i += sizeof(uint8_t);
+
+    //Second header byte
+    _masked = buf[*i] & 128;
+    _lenght = buf[*i] & 127;
+    *i += sizeof(uint8_t);
+
+    if (_lenght == 126 && raw.size() >= *i + sizeof(uint16_t)) {
+        _lenght = ntohs(*reinterpret_cast<uint16_t*>(buf + *i));
+        *i += sizeof(uint16_t);
+    }
+    else if (_lenght == 127 && raw.size() >= *i + sizeof(uint64_t)) {
+        _lenght = ntohl(*reinterpret_cast<uint64_t*>(buf + *i));
+        *i += sizeof(uint64_t);
+    }
+
+    if (_masked && raw.size() >= *i + _lenght + sizeof(uint32_t)) {
+        //message is masked
+        uint64_t k = *i;
+        *i += sizeof(uint32_t);
+        message = std::move(raw.share(*i, _lenght));
+        unmask(buf + *i, buf + *i, buf + k, _lenght);
+        _is_empty = false;
+        *i += _lenght;
+    } else if (raw.size() >= *i + _lenght) {
+        message = std::move(raw.share(*i, _lenght));
+        _is_empty = false;
+        *i += _lenght;
+    }
+}
+
+void httpd::websocket_message::done() {
+    const auto header = opcode ^ 0x80;
+
+    uint64_t len = 0;
+    for (auto &fragment : _fragments)
+        len += fragment.size();
+
+    if (len < 125) { //Size fits 7bits
+        _header[0] = header;
+        _header[1] = static_cast<unsigned char>(len);
+        _header_size = 2;
+    } //Size in extended to 16bits
+    else if (len < std::numeric_limits<uint16_t>::max()) {
+        _header[0] = header;
+        _header[1] = static_cast<unsigned char>(126);
+        auto s = htons(len);
+        std::memcpy(_header + 2, &s, 2);
+        _header_size = 4;
+    }
+    else { //Size extended to 64bits
+        _header[0] = header;
+        _header[1] = static_cast<unsigned char>(127);
+        auto l = htonl(len);
+        std::memcpy(_header + 2, &l, 8);
+        _header_size = 10;
+    }
+}
+
+temporary_buffer<char> & httpd::websocket_message::concat() {
+    if (!_concatenated.empty())
+        return _concatenated;
+
+    uint64_t length = 0;
+    for (auto& n : _fragments)
+        length += n.size();
+    temporary_buffer<char> ret(length);
+    length = 0;
+    for (auto& n : _fragments) {
+        std::memcpy(ret.share(length, n.size()).get_write(), n.get(), n.size());
+        length = n.size();
+    }
+    return _concatenated = std::move(ret);
+}
+
+
+inline void httpd::inbound_websocket_fragment::unmask(char *dst, const char *src, const char *mask, uint64_t length) {
+    for (uint64_t j = 0; j < _lenght; ++j) {
+        dst[j] = src[j] ^ mask[j % 4];
+    }
+/*    for (unsigned int n = (length >> 2) + 1; n; n--) {
+        *(dst++) = *(src++) ^ mask[0];
+        *(dst++) = *(src++) ^ mask[1];
+        *(dst++) = *(src++) ^ mask[2];
+        *(dst++) = *(src++) ^ mask[3];
+    }*/
+}
diff --git a/http/websocket_fragment.hh b/http/websocket_fragment.hh
new file mode 100644
index 0000000..12ca000
--- /dev/null
+++ b/http/websocket_fragment.hh
@@ -0,0 +1,150 @@
+//
+// Created by hbarraud on 4/2/17.
+//
+
+#ifndef SEASTAR_WEBSOCKET_FRAGMENT_HH
+#define SEASTAR_WEBSOCKET_FRAGMENT_HH
+
+#include <core/reactor.hh>
+
+namespace httpd {
+
+    enum websocket_opcode : uint8_t {
+        CONTINUATION = 0x0,
+        TEXT = 0x1,
+        BINARY = 0x2,
+        CLOSE = 0x8,
+        PING = 0x9,
+        PONG = 0xA,
+        RESERVED = 0xB
+    };
+
+    class websocket_fragment_base {
+    protected:
+
+        bool _fin = false;
+        websocket_opcode _opcode = RESERVED;
+        uint64_t _lenght = 0;
+        bool _rsv2 = false;
+        bool _rsv3 = false;
+        bool _rsv1 = false;
+        bool _masked = false;
+        temporary_buffer<char> _maskkey;
+
+    public:
+        bool _is_empty = false;
+        temporary_buffer<char> message;
+        bool _is_valid = false;
+
+        websocket_fragment_base() : _is_empty(true), _is_valid(false) {}
+
+        websocket_fragment_base(websocket_fragment_base &&fragment) noexcept : _fin(fragment.fin()),
+                                                                               _opcode(fragment.opcode()),
+                                                                               _lenght(fragment.length()),
+                                                                               _rsv2(fragment.rsv2()),
+                                                                               _rsv3(fragment.rsv3()),
+                                                                               _rsv1(fragment.rsv1()),
+                                                                               _masked(fragment.masked()),
+                                                                               _maskkey(std::move(fragment._maskkey)),
+                                                                               _is_empty(fragment._is_empty),
+                                                                               message(std::move(fragment.message)),
+                                                                               _is_valid(fragment._is_valid) {
+        }
+
+        bool fin() { return _fin; }
+
+        websocket_opcode opcode() { return _opcode; }
+
+        uint64_t &length() { return _lenght; }
+
+        bool rsv2() { return _rsv2; }
+
+        bool rsv3() { return _rsv3; }
+
+        bool rsv1() { return _rsv1; }
+
+        bool masked() { return _masked; }
+
+        bool valid() {
+            return !((rsv1() || rsv2() || rsv3() || (opcode() > 2 && opcode() < 8) ||
+                     opcode() > 10 || (opcode() > 2 && (!fin() || length() > 125))));
+        }
+
+        operator bool() { return !_is_empty && valid(); }
+    };
+
+    class inbound_websocket_fragment : public websocket_fragment_base {
+
+    public:
+
+        inbound_websocket_fragment(const inbound_websocket_fragment &) = delete;
+
+        inbound_websocket_fragment(inbound_websocket_fragment &&other) noexcept : websocket_fragment_base(std::move(other)) {
+        }
+
+        inbound_websocket_fragment(temporary_buffer<char> &raw, uint32_t *index);
+
+        inbound_websocket_fragment() : websocket_fragment_base() {}
+
+    private:
+        inline void unmask(char *dst, const char *src, const char *mask, uint64_t length);
+    };
+
+    class websocket_message {
+    public:
+        websocket_opcode opcode = RESERVED;
+        char _header[10];
+        size_t _header_size = 0;
+        std::vector<temporary_buffer<char>> _fragments;
+        temporary_buffer<char> _concatenated;
+
+        websocket_message() noexcept : _is_empty(true) { }
+        websocket_message(const websocket_message &) = delete;
+        websocket_message(websocket_message &&other) noexcept : opcode(other.opcode),
+                                                                _fragments(std::move(other._fragments)),
+                                                                _concatenated(std::move(other._concatenated)),
+                                                                _is_empty(other._is_empty)  {
+        }
+
+        void operator=(const websocket_message&) = delete;
+        websocket_message & operator= (websocket_message &&other) {
+            if (this != &other) {
+                opcode = other.opcode;
+                _fragments = std::move(other._fragments);
+                _concatenated = std::move(other._concatenated);
+                _is_empty = other._is_empty;
+            }
+            return *this;
+        }
+
+        websocket_message(std::unique_ptr<websocket_fragment_base> fragment) noexcept : _is_empty(false) {
+            _fragments.push_back(std::move(fragment->message));
+            opcode = fragment->opcode();
+        }
+
+        websocket_message(websocket_opcode kind, temporary_buffer<char> message) noexcept : _is_empty(false) {
+            _fragments.push_back(std::move(message));
+            opcode = kind;
+        }
+
+        websocket_message(websocket_opcode kind, sstring message) noexcept : _is_empty(false) {
+            _fragments.push_back(std::move(message).release());
+            opcode = kind;
+        }
+
+        void append(std::unique_ptr<websocket_fragment_base> fragment) {
+            _fragments.push_back(std::move(fragment->message));
+        }
+
+        void done();
+
+        temporary_buffer<char> & concat();
+
+        bool empty() { return _is_empty || opcode == CLOSE; }
+
+    private:
+        bool _is_empty;
+    };
+}
+
+#endif //SEASTAR_WEBSOCKET_FRAGMENT_HH
diff --git a/http/websocket_handler.hh b/http/websocket_handler.hh
new file mode 100644
index 0000000..91b8d5f
--- /dev/null
+++ b/http/websocket_handler.hh
@@ -0,0 +1,145 @@
+//
+// Created by hbarraud on 4/1/17.
+//
+
+#ifndef SEASTAR_WEBSOCKET_HANDLER_HH
+#define SEASTAR_WEBSOCKET_HANDLER_HH
+
+#include "handlers.hh"
+
+namespace httpd {
+
+typedef std::function<future<>(const httpd::request& req, connected_websocket ws)> future_ws_handler_function;
+
+class websocket_function_handler : public httpd::handler_websocket_base {
+
+public:
+    websocket_function_handler(const future_ws_handler_function & f_handle)
+            : _f_handle(f_handle) {
+    }
+
+    future<> handle(const sstring &path, connected_websocket ws) override {
+        return _f_handle(ws._request, std::move(ws));
+    }
+
+protected:
+    future_ws_handler_function _f_handle;
+};
+
+typedef std::function<future<>(const httpd::request&, websocket_output_stream* ws)> future_ws_on_dis_connected;
+typedef std::function<future<>(const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message)> future_ws_on_message;
+
+typedef std::function<void(const httpd::request&, websocket_output_stream* ws)> void_ws_on_dis_connected;
+typedef std::function<void(const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message)> void_ws_on_message;
+
+class websocket_handler : public httpd::handler_websocket_base {
+
+public:
+    websocket_handler() : _on_connection([] (const httpd::request&, websocket_output_stream* ws) { return make_ready_future(); }),
+                          _on_message([] (const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) { return make_ready_future(); }),
+                          _on_disconnection([] (const httpd::request&, websocket_output_stream* ws) { return make_ready_future(); }),
+                          _on_pong([] (const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) { return make_ready_future(); }),
+                          _on_ping([] (const httpd::request&, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) {
+                              message->opcode = websocket_opcode::PONG;
+                              return ws->write(std::move(message));
+                          })  {}
+
+    future<> handle(const sstring &path, connected_websocket ws) override {
+        auto input = ws.input();
+        auto output = ws.output();
+        return do_with(std::move(ws), std::move(input), std::move(output), [this] (connected_websocket &ws,
+                                                                                   websocket_input_stream &input,
+                                                                                   websocket_output_stream &output) {
+            return _on_connection(ws._request, &output).then([this, &ws, &input, &output] {
+                return repeat([this, &input, &output, &ws] {
+                    return input.read().then([this, &output, &ws](std::unique_ptr<httpd::websocket_message> buf) {
+                        if (!buf)
+                            return make_ready_future<bool_class<stop_iteration_tag>>(stop_iteration::yes);
+                        return on_message_internal(ws._request, &output, std::move(buf)).then([] (bool close) {
+                            return bool_class<stop_iteration_tag>(close);
+                        });
+                    });
+                });
+            });
+        }).then([this, &ws, &output] { return _on_disconnection(ws._request, &output); });
+    }
+
+    void on_message(const void_ws_on_message & handler) {
+        _on_message = [handler](const httpd::request& req, websocket_output_stream *ws,
+                                std::unique_ptr<httpd::websocket_message> message) {
+            handler(req, ws, std::move(message));
+            return make_ready_future();
+        };
+    }
+    void on_ping(const void_ws_on_message & handler) {
+        _on_ping = [handler](const httpd::request& req, websocket_output_stream *ws,
+                             std::unique_ptr<httpd::websocket_message> message) {
+            handler(req, ws, std::move(message));
+            return make_ready_future();
+        };
+    }
+    void on_pong(const void_ws_on_message & handler) {
+        _on_pong = [handler](const httpd::request& req, websocket_output_stream *ws,
+                             std::unique_ptr<httpd::websocket_message> message) {
+            handler(req, ws, std::move(message));
+            return make_ready_future();
+        };
+    }
+    void on_connection(const void_ws_on_dis_connected & handler) {
+        _on_connection = [handler](const httpd::request& req, websocket_output_stream *ws) {
+            handler(req, ws);
+            return make_ready_future();
+        };
+    }
+    void on_disconnection(const void_ws_on_dis_connected & handler) {
+        _on_disconnection = [handler](const httpd::request& req, websocket_output_stream *ws) {
+            handler(req, ws);
+            return make_ready_future();
+        };
+    }
+
+    void on_message_future(const future_ws_on_message & handler) { _on_message = handler; }
+    void on_ping_future(const future_ws_on_message & handler) { _on_ping = handler; }
+    void on_pong_future(const future_ws_on_message & handler) { _on_pong = handler; }
+    void on_connection_future(const future_ws_on_dis_connected & handler) { _on_connection = handler; }
+    void on_disconnection_future(const future_ws_on_dis_connected & handler) { _on_disconnection = handler; }
+
+private:
+
+    future<bool> on_message_internal(const httpd::request& req, websocket_output_stream* ws, std::unique_ptr<httpd::websocket_message> message) {
+        switch (message->opcode){
+            case TEXT:
+                //FIXME Check that buffer is valid UTF-8
+            case BINARY:
+                return _on_message(req, ws, std::move(message)).then([] {
+                    return false;
+                });
+            case PING:
+                return _on_ping(req, ws, std::move(message)).then([] {
+                    return false;
+                });
+            case PONG:
+                return _on_pong(req, ws, std::move(message)).then([] {
+                    return false;
+                });
+            case CLOSE:
+            case RESERVED:
+            default:
+                return make_ready_future().then([] {
+                    return true;
+                });
+        }
+    }
+
+protected:
+    future_ws_on_dis_connected _on_connection;
+    future_ws_on_message _on_message;
+    future_ws_on_dis_connected _on_disconnection;
+    future_ws_on_message _on_pong;
+    future_ws_on_message _on_ping;
+};
+
+}
+
+
+#endif //SEASTAR_WEBSOCKET_HANDLER_HH

Avi Kivity

<avi@scylladb.com>
unread,
Apr 15, 2017, 6:58:25 AM4/15/17
to Hippolyte Barraud, seastar-dev

Perhaps the bottleneck is elsewhere.


Recommendations:

 - run the server under "strace -fF -o /tmp/trace" and check that all reactors are reading and writing data

 - run `top`, press "1", and look at all cores' system and user time.  If there is any anomaly (one core using way more system time), you have some kernel bottleneck and we can investigate it further

 - run `perf top` and look for any noise from the kernel

 - try the perftune.py script

 - do not use virtual machines

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To post to this group, send email to seast...@googlegroups.com.
Visit this group at https://groups.google.com/group/seastar-dev.

Hippolyte Barraud

<hippolyte.barraud@gmail.com>
unread,
May 1, 2017, 10:11:35 AM5/1/17
to seastar-dev, hippolyte.barraud@gmail.com
Sorry for the delay coming back to you.

So, to get a clean environment and track the bottleneck, I instantiated two VMs on google cloud. I know you suggested not using VMs, but I don't have hardware lying around right now.
I collected the data from a clean CentOS; the websocket client was on a separate VM.

I benchmarked by connecting 500 ws clients sending payloads of 20 bytes each and measuring the time it takes for my server to echo them back. 
I got similar results from when i benchmarked using my macbook (performance non-linear & penalty when using more than 2/3 cores).

- I used perf record and generated some svg flame graphs using FlameGraph. I'm not sure what to interpret from them though. They're attached in case anyone is more knowledgeable.
- As for user/system time ratio, I get 70% system time with -c1 and between 35% to 40% when running on all 16 cores, consistently across cores.
- perf top report "iowrite16" having the most overhead with 12% when running with -c1
- When running at -c16, perf top reports smp::poll_queues being the most hungry, with ~15%. Also, __vdso_clock_gettime comes second at ~7%.
- Looking at the strace output, I can confirm that all 16 PIDs are calling sendmsg and read. Also, the writes are evenly distributed across PIDs.

I find the presence of __vdso_clock_gettime strange because my code doesn't call gettimeofday, apart from the 101 Switching Protocol http response header which is timestamped when accepting a new websocket connection.

In case you want to check the raw data from perf top and top, here they are : https://gist.github.com/HippoBaro/c63086c672b49b504b9363e23b04dd6b

As for perftune.py, I did use it, but measured no real benefit before/after.
...
perf-kernel.c1.svg
perf-kernel.c16.svg

Avi Kivity

<avi@scylladb.com>
unread,
May 3, 2017, 8:27:42 AM5/3/17
to Hippolyte Barraud, seastar-dev



On 05/01/2017 05:11 PM, Hippolyte Barraud wrote:
Sorry for the delay coming back to you.

So, to get a clean environment and track the bottleneck, I instantiated two VMs on google cloud. I know you suggested not using VMs, but I don't have hardware lying around right now.
I collected the data from a clean CentOS; the websocket client was on a separate VM.

I benchmarked by connecting 500 ws clients sending payloads of 20 bytes each and measuring the time it takes for my server to echo them back. 
I got similar results from when i benchmarked using my macbook (performance non-linear & penalty when using more than 2/3 cores).

- I used perf record and generated some svg flame graphs using FlameGraph. I'm not sure what to interpret from them though. They're attached in case anyone is more knowledgeable.

They are not very useful; did you run the profile while the system was under full load? start system, apply load, profile, end profile, remove load -- to make sure you don't profile it while it's partially idle.


- As for user/system time ratio, I get 70% system time with -c1 and between 35% to 40% when running on all 16 cores, consistently across cores.

This is an indication that it's mostly polling.


- perf top report "iowrite16" having the most overhead with 12% when running with -c1

It's accessing the virtual network, this is really bad in VMs without device assignment.  There's not a lot we can do here.


- When running at -c16, perf top reports smp::poll_queues being the most hungry, with ~15%. Also, __vdso_clock_gettime comes second at ~7%.

Again, it's polling like crazy because it has nothing else to do.


- Looking at the strace output, I can confirm that all 16 PIDs are calling sendmsg and read. Also, the writes are evenly distributed across PIDs.

I find the presence of __vdso_clock_gettime strange because my code doesn't call gettimeofday, apart from the 101 Switching Protocol http response header which is timestamped when accepting a new websocket connection.


seastar itself calls clock_gettime() in its idle loop like crazy (via std::steady_clock::now()).


Things you can try:
 - check if multiqueue is enabled (ethtool -l eth0)
 - switch to latest Fedora if not, and check again
 - switch to native stack + dpdk
 - use AWS, which offers "enhanced networking" on some instances; these are multi-queue.

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To post to this group, send email to seast...@googlegroups.com.
Visit this group at https://groups.google.com/group/seastar-dev.

asabbaab@gmail.com

<asabbaab@gmail.com>
unread,
May 5, 2017, 3:03:44 AM5/5/17
to seastar-dev, hippolyte.barraud@gmail.com

Hi,

@Hippolyte: if I understand correctly your WebSocket server implementation is extension of existing Seastar httpd implementation.
Could you show us how fast is base httpd in your (native) environment? I would like to see results from seawreck - httpd benchmark.
Please look at here: https://github.com/scylladb/seastar/wiki/HTTPD-benchmark.
It would be great to see results in similar manner you presented in your first email - req/s for different number of clients and different number of cores.
Then we could compare it with numbers you presented for WebSocket server.

Regards,
Dawid

...

asabbaab@gmail.com

<asabbaab@gmail.com>
unread,
May 8, 2017, 8:05:48 AM5/8/17
to seastar-dev, hippolyte.barraud@gmail.com, asabbaab@gmail.com
Hi,

BTW. I browsed sources quickly. There is one thing which makes me wonder.
I mean sleep(30ms) inside connection::upgrade_websocket method.

                    }, [this] {

                        //If we don't wait, the HTTP response might not be flushed before user-code start sending messages
                        //resulting in malformed handcheck response.
                        return sleep(std::chrono::milliseconds(30)); //FIXME this is awful
                    }).then

I'm not sure if this sleep is relevant and has anything to do with bad performance results.
It may be part of "fast path" during benchmarking or just "negotiation path". Anyway it looks quite suspicious :)

Regards,
D


...

Avi Kivity

<avi@scylladb.com>
unread,
May 8, 2017, 10:01:04 AM5/8/17
to asabbaab@gmail.com, seastar-dev, hippolyte.barraud@gmail.com



On 05/08/2017 03:05 PM, asab...@gmail.com wrote:
Hi,

BTW. I browsed sources quickly. There is one thing which makes me wonder.
I mean sleep(30ms) inside connection::upgrade_websocket method.

                    }, [this] {
                        //If we don't wait, the HTTP response might not be flushed before user-code start sending messages
                        //resulting in malformed handcheck response.
                        return sleep(std::chrono::milliseconds(30)); //FIXME this is awful
                    }).then

I'm not sure if this sleep is relevant and has anything to do with bad performance results.
It may be part of "fast path" during benchmarking or just "negotiation path". Anyway it looks quite suspicious :)


Good catch, should use an explicit flush.

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To post to this group, send email to seast...@googlegroups.com.
Visit this group at https://groups.google.com/group/seastar-dev.

virtan@virtan.com

<virtan@virtan.com>
unread,
Nov 13, 2017, 2:43:07 PM11/13/17
to seastar-dev
Hi.

SIMDed version of un_mask in websocket_message.hh using AVX2 instructions:

inline void un_mask(char* dst, const char* src, const char* mask, uint64_t length) {
    if(length >= 32) {
        __m256i w, w_mask;
        w_mask = _mm256_set1_epi32(*(int*) mask);
        do {
            w = _mm256_loadu_si256((__m256i*) src);
            w = _mm256_xor_si256(w, w_mask);
            _mm256_storeu_si256((__m256i*) dst, w);
            src += 32;
            dst += 32;
            length -= 32;
        } while(length >= 32);
    }
    for(uint64_t j = 0; j < length; ++j) {
        dst[j] = src[j] ^ mask[j % 4];
    }
}

It is a bit faster than byte-cycle:

31 byte: original: 1688453159 chars/sec
         modified: 1689373297 chars/sec
32k byte: original: 1924211844 chars/sec
          modified: 17969049890 chars/sec

Best regards.

PS: thank you for your project and ws implementation!
...

haiping fu

<haipingf@gmail.com>
unread,
Apr 17, 2018, 11:02:09 PM4/17/18
to seastar-dev
Hey Hippolyte, nice work, I'm interested in the websocket implementation on Seastar framework!!!

What's the latest status of this websocket implementation? Did you figure out the root cause why the performance would downgrade in multicores env? Or this issue got fixed in your latest repo: https://github.com/HippoBaro/seastar?

在 2017年4月9日星期日 UTC-7下午4:52:09,Hippolyte Barraud写道:

Hippolyte Barraud

<hippolyte.barraud@gmail.com>
unread,
Apr 17, 2018, 11:21:54 PM4/17/18
to seastar-dev
Hello,

Nice to see that there is still interest in this. This implementation was very much a weekend project, a first-hand experience with getting something done in seastar. Unfortuanlty, I don't have the time that this implementation would need to go upstream.
It does work however and I would be glad if someone would be interested in giving seastar proper WebSocket support.

There are many things to improve. Proper support for streaming, better parser, per-message deflate, etc.

The reason why it seemed to not scale properly was explained by other people on the mailing list, it was the client (localhost in my tests) that was not pushing enough load, and the server would spend most of its time polling for events.
Reply all
Reply to author
Forward
0 new messages