Some help testing websockets.

713 views
Skip to first unread message

mike waites

unread,
Dec 22, 2013, 9:36:20 AM12/22/13
to python-...@googlegroups.com
Hey Guys,

Just getting started with tornado.

Im working on a little project using web sockets and i'm having a bit of trouble figuring the best way to write unit tests around the WebsocketHandler code.  Ive googled around a bit and found a few things.

Heres the WebSocketHandler code that im testing against.

import os

import tornado.ioloop
import tornado.web
import tornado.websocket

from tornado.options import define, options, parse_command_line

define("port", default=8888, help="run on the given port", type=int)

TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')


class IndexHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self):
        self.render(os.path.join(TEMPLATE_DIR, "index.html"))


clients = dict()

rooms = dict()


class WebSocketHandler(tornado.websocket.WebSocketHandler):

    def open(self, room_id=None, *args, **kwargs):
        self.stream.set_nodelay(True)
        self.room_id = room_id
        self.join()

    def join(self):
        access_token = self.get_argument('access_token', None)
        clients[access_token] = {self.room_id: self}
        room = rooms.setdefault(self.room_id, set())
        room.add(access_token)
        self.broadcast("{id} just joined the room".format(id=access_token))

    def broadcast(self, message):
        room = rooms[self.room_id]
        for client_id in room:
            connections = clients[client_id]
            connections[self.room_id].write_message(message)

    def on_message(self, message):
        self.broadcast(message)

    def on_close(self):
        if self in clients:
            self.clients.remove(self)

app = tornado.web.Application([
    (r'/', IndexHandler),
    (r'/rt/(?P<room_id>[a-zA-Z0-9\.:,_]+)/?', WebSocketHandler),
])

if __name__ == '__main__':
    parse_command_line()
    app.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()


firstly was this little lib that looks fairly well maintained.  I haven't been able to get this to run in a unit test yet though.



And here is the unit test i put together to see if i could get it working.


class AsyncWSClient(TornadoWebSocketClient):
    """
    Asynchronous web socket client based on ws4py's tornadoclient
    Sends a message and calls callback with received message
    """

    def opened(self):
        """called when web socket opened"""
        self._callback()

    def run(self, callback, message=None):
        """Connects and sends message when connected"""
        self._callback = callback
        self.connect()


class WebSocketServerTests(testing.AsyncHTTPTestCase):

    def get_app(self):
        return app

    def get_protocol(self):
        return 'ws'

    def test_room_registry(self):

        url = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=1)
        ws = AsyncWSClient(self.get_url(url))
        ws.run(self.stop)

        from myapp.realtime.server import rooms
        self.assertIn('1', rooms)

        self.wait()


This test fails and it never seems to connect or call the code from the WebSocketHandler.

The seoncd lib a found which doesnt seem to be maintained any more but i have managed to get working is this:



class WebSocketServerTests(testing.AsyncHTTPTestCase):

    def get_app(self):
        return app

    def get_protocol(self):
        return 'ws'

    def test_client_registry(self):

        url = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=1)

        runner = self

        class WSClient(websocket.WebSocket):

            def on_open(self, *args, **kwargs):
                from devscuss.realtime.server import rooms
                runner.assertIn('1', rooms)
                runner.io_loop.add_callback(runner.stop)

        self.io_loop.add_callback(partial(WSClient,
                                          self.get_url(url),
                                          self.io_loop))
        self.wait()


Im not to keen on this though and to be honest i dont really get whats going on.

So im hoping some of you can do two things for me.

A) Recommend the best way to approach writing tests for this stuff
B) If no other suggestions come to mind, please help me fix the first example as i think its much cleaner plus better maintained.

Hope to hear back soon

A. Jesse Jiryu Davis

unread,
Dec 22, 2013, 12:37:11 PM12/22/13
to python-...@googlegroups.com
Mike, I haven't used ws4py before, but I could test your code successfully with Tornado's own WebSocketClientConnection. I ran this with Tornado's current code on GitHub:

from mike_waites import rooms, app
from tornado import testing, websocket

class WebSocketServerTests(testing.AsyncHTTPTestCase):

    def get_app(self):
        return app

    def get_protocol(self):
        return 'ws'

    def test_room_registry_2(self):
        url = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=1)
        websocket.websocket_connect(
            self.get_url(url),
            io_loop=self.io_loop,
            callback=self.stop)

        self.wait()
        self.assertIn('1', rooms)

If you want to use ws4py you at least need to call self.wait() right after ws.run(self.stop) to give the client a chance to connect. I tried that myself and couldn't get it to work, though, so there may be other problems with your approach in your test using ws4py.


--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

mike waites

unread,
Dec 23, 2013, 2:10:32 AM12/23/13
to python-...@googlegroups.com
Jesse,

Thanks so much for getting back to me on this.  I wasn't aware this module existed, is it documented anywhere perhaps i just missed it.  I certainly having no real need to use an external module it.  Ill get my test switched over and see about moving forward using the websocket module from tornado itself.

Thanks again


--
You received this message because you are subscribed to a topic in the Google Groups "Tornado Web Server" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/python-tornado/_uXgfUmspO8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to python-tornad...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.



--
Regards
Mike

mike waites

unread,
Dec 23, 2013, 2:25:43 AM12/23/13
to python-...@googlegroups.com
Also Jesse,

Im wondering how I go about simulating multiple client connections in the same test? IE asserting that a new member joining a an existing room does what i would expect.

--
Regards
Mike

A. Jesse Jiryu Davis

unread,
Dec 23, 2013, 12:31:52 PM12/23/13
to python-...@googlegroups.com
WebSocketClientConnection is documented waaaay down here:


You don't need a new IOLoop to simulate an additional client. After all, IOLoop is designed for concurrent I/O operations:

import unittest
from mike_waites import app
from tornado import testing, websocket

class WebSocketServerTests(testing.AsyncHTTPTestCase):

    def get_app(self):
        return app

    def get_protocol(self):
        return 'ws'

    def test_room_registry(self):
        url_1 = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=1)
        websocket.websocket_connect(
            self.get_url(url_1),
            io_loop=self.io_loop,
            callback=self.stop)

        connection_1 = self.wait().result()
        connection_1.read_message(callback=self.stop)
        message = self.wait().result()
        self.assertEqual('1 just joined the room', message)

        url_2 = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=2)
        websocket.websocket_connect(
            self.get_url(url_2),
            io_loop=self.io_loop,
            callback=self.stop)

        connection_2 = self.wait().result()
        connection_1.read_message(callback=self.stop)
        message = self.wait().result()
        self.assertEqual('2 just joined the room', message)
 
Note the funky "self.wait().result()" syntax. It's because websocket_connect() and read_message() take a callback and pass the callback to IOLoop.add_future. Once the operation completes, the callback, self.stop(), is executed with the resolved Future. The stop() method returns that Future, and you get the real return value from the Future by calling result() on it. Whew.

You can simplify this code, and use Futures the way Ben intended, by converting your test function to a coroutine:

    @gen_test
    def test_room_registry(self):
        url_1 = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=1)
        connection_1 = yield websocket.websocket_connect(
            self.get_url(url_1),
            io_loop=self.io_loop)

        message = yield connection_1.read_message()
        self.assertEqual('1 just joined the room', message)

        url_2 = '/rt/{room_id}/?access_token={token}'.format(room_id=1, token=2)
        connection_2 = yield websocket.websocket_connect(
            self.get_url(url_2),
            io_loop=self.io_loop)

        message = yield connection_1.read_message()
        self.assertEqual('2 just joined the room', message)

mike waites

unread,
Dec 23, 2013, 6:08:20 PM12/23/13
to python-...@googlegroups.com

Thanks so much again for this its really really appreciated.  I'm gonna give this a whirl tomorrow.

After your suggestions earlier I noticed that in any test case the on_close method of the WebsocketHandler was never called.  I tried a few things after updating to the latest code on the master branch where I was digesting the result() method from the future object returned from the webscoket_connection and then calling close() but nothing seemed to trigger the on_close handler of the socket handler class I'm testing against.

Do you have any ideas what the cause of that might be?

Thanks so much again.

A. Jesse Jiryu Davis

unread,
Dec 23, 2013, 11:55:57 PM12/23/13
to python-...@googlegroups.com
No problem, Mike. I haven't used websockets much so I'm learning as we go.

Continuing with the test_room_registry coroutine from above, add these lines at the end:

        connection_1.close()
        yield gen.Task(self.io_loop.add_timeout, timedelta(seconds=1))

You'll need to "from tornado import gen" and "from datetime import timedelta."

The close() call closes the client side of the socket, obviously. After that, the loop needs to keep running for a bit until the server notices the change. A simple way to do that is the add_timeout code above.

This timeout is sort of crummy, so if I were really designing this, I'd add to your server the ability to register a callback to be executed whenever room-membership changes. Then in your test:

        self._app.add_room_change_callback((yield gen.Callback('change')))
        connection_1.close()
        yield gen.Wait('change')
        # ...assert that '1' is not in the room anymore....

See the Callback / Wait docs here. Note that gen_test has a default timeout of 5 seconds, so it'll (correctly) kill your test and mark it failed if the "yield gen.Wait" statement doesn't return promptly. 

mike waites

unread,
Dec 24, 2013, 7:55:50 AM12/24/13
to python-...@googlegroups.com
Hey Jesse,

Thanks again.  Changing the test to coroutine feels really elegant and thats all working great!

Regarding the on_close method issue.  It makes perfect sense that the loop would need sometime as the client disconnects.  The first option you provided there works perfectly and the on_close method of the server gets called. Obviously adding a second delay to any test that relies on asserting anything that happens in that on_close method is, as you say, crummy.

The second option has totally pushed the boundaries of my understanding though.  Am i correct in thinking that self._app is the application object passed back from

def get_app(self):
    return app

Which is the application object where the websocket handler is set up?  Its totally unclear to me how i would register a call back against that object and also what the callback would actually be doing? Could I trouble you for an example?

Thanks again and happy holidays! :)

A. Jesse Jiryu Davis

unread,
Dec 24, 2013, 1:10:46 PM12/24/13
to python-...@googlegroups.com
I was just kind of sketching out ideas for a more complex implementation. I'd make a custom Application subclass:

class MikeApplication(tornado.web.Application):
    def __init__(self, *args, **kwargs):
        super(MikeApplication, self).__init__(*args, **kwargs)
        self.room_change_callbacks = set()

    def on_room_changed(self):
        loop = tornado.ioloop.IOLoop.current()
        for cb in self.room_change_callbacks:
            loop.add_callback(cb)

Instantiate this in your main program file, rather than instantiating a generic Application. Then in your WebSocketHandler call "self.application.on_room_changed()" in join() and on_close(). Then at the end of your test method:

        callback = yield gen.Callback('change')
        self._app.room_change_callbacks.add(callback)
        connection_1.close()
        yield gen.Wait('change')
        self._app.room_change_callbacks.remove(callback)
        self.assertFalse('1' in rooms['1'])

I proposed the room_change_callbacks idea because I bet it'll come in handy for other tests and other parts of your application itself. The callbacks you register there can be any callable; the gen.Callback object I'm registering, in the code above, is a particular mechanism you can use with coroutines.

In on_room_changed() you could just call each callback directly, instead of passing it to loop.add_callback(), but there are a couple advantages to my approach:
  • loop.add_callback() ensures exceptions thrown in each callback are caught and logged, instead of bubbling up into the caller of application.on_room_changed().
  • loop.add_callback() defers execution until the loop's next iteration, which is more likely the control flow you want, rather than running these callbacks in the midst of processing the current request.

mike waites

unread,
Dec 28, 2013, 9:34:57 AM12/28/13
to python-...@googlegroups.com
Hey again Jesse.


Thanks so much for this it has really shed some light on a lot of the way things should be done in tornado.

So i get this set up and ran the test that was previously passing with the timeout added to the event loop but for some reason the on_close is not called using the callback.  At first i thought that perhaps its was a timing thing like before so i tried adding a few time.sleep(1) calls in different parts of the application but nothing seemed to work.  Perhaps i need to call the callback method differently?

    @testing.gen_test
    def test_room_registry(self):

        room_id = '1'
        token = '1'
        url = self.base_url.format(room_id=room_id, token=token)

        conn1 = yield websocket.websocket_connect(self.get_url(url),
                                                  io_loop=self.io_loop)

        message = yield conn1.read_message()
        self.assertIn(token, rooms[room_id])

        callback = yield gen.Callback('change')
        self._app.room_change_callbacks.add(callback)
        conn1.close()
        self._app.on_room_changed()
        yield gen.Wait('change')
        self._app.room_change_callbacks.remove(callback)
        self.assertNotIn(token, rooms[room_id])

So really the only thing i have added thats different from your example above is the call to the callback method 'on_room_changed()' after closing the connection.  Without calling this method the call to gen.Wait() will timeout.  So im not sure what else to try now really so that I can test that client disconnecting etc.

Also do you use gittip? Id like to make a donation for all you help? 

A. Jesse Jiryu Davis

unread,
Jan 2, 2014, 1:09:16 PM1/2/14
to python-...@googlegroups.com
Hi Mike, sorry for the delay, I was away for a week. I'm not sure why your test is hanging. I've made this gist, which works as expected with Tornado 3.1.1:


My code in that gist doesn't call on_room_changed() directly. It relies on the WebSocketHandler to actually notice when a client connection is closed. The test fails, since the room-leaving logic hasn't been implemented, but it doesn't hang. I.e., the on_close() callback is executed. I've included the test's output in a comment at the bottom of the gist.

My examples earlier were using WebSocketClientConnection.close(), which is available in Tornado master but not in 3.1.1. So my new code in the gist calls conn1.protocol.close(). Once Tornado 3.2 is out, conn1.close() will work.

Your idea about calling time.sleep() to wait for the WebSocketHandler to notice the closed connection won't work: if you ever call sleep() in a Tornado application you block the whole process, so nothing happens. If you want to pause the current coroutine while allowing the I/O loop to continue processing events, you need to follow this weird pattern:

@gen.coroutine
def f():
    yield gen.Task(self.io_loop.add_timeout, timedelta(seconds=1))

Add "from datetime import timedelta" at the top of your file.

But, to reiterate, it's better to have a solid event-notification system like the room_change_callbacks set, than to add sleeps to your tests.

I'm not on gittip. Thanks for the generous offer, but I think it's part of my job at MongoDB and part of my life as a Tornado contributor to participate in this mailing list.

mike waites

unread,
Jan 3, 2014, 5:45:18 PM1/3/14
to python-...@googlegroups.com
DUDE!!

You an absolute legend!! That makes so much more sense!  I can see what i was doing wrong there now!

Well hopefully thats the last of my pesky questions.  I think ive got all the bits i need to crack on with my tests now.

Thank you so much for all your help! you have really shed some light on tornado for me

By the way, is there an official tornado irc channel on freenode or something?
Reply all
Reply to author
Forward
0 new messages