websocket client on_message_callback?

995 views
Skip to first unread message

JohnMudd

unread,
Sep 7, 2015, 10:37:14 PM9/7/15
to Tornado Web Server
I'm trying to create a websocket client/server app using the on_message_callback feature of tornado.websocket.websocket_connect. I believe this server sends a msg to the client but the client does not receive it. 

I added some debug messages in tornado/websocket.py. I verified that self._on_message_callback was assigned the function I passed. But on_message() in websocket.py is not called in the example below. 

Any suggestions?



Server:

#! /usr/bin/env python

import tornado.ioloop
import tornado.web
import tornado.websocket

class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self, *args):
        print 'before write_message()'
        self.write_message('test')
        print 'after  write_message()'

    def on_message(self, message):
        pass

    def on_close(self):
        pass

app = tornado.web.Application([
    (r'/ws', WebSocketHandler),
])

if __name__ == '__main__':
    app.listen(8000)
    tornado.ioloop.IOLoop.instance().start()



Client:

#! /usr/bin/env python

import time

import tornado
import tornado.websocket

def put(msg):
    print 'put(): msg=%s' % `msg`

@tornado.gen.coroutine
def process_msgs():
    url = 'ws://localhost:8000/ws'
    client = tornado.httpclient.HTTPRequest(url)
    conn = yield tornado.websocket.websocket_connect(client, on_message_callback=put)

    print 'before sleep'
    time.sleep(60)
    print 'after sleep'

tornado.ioloop.IOLoop.instance().run_sync(process_msgs)



I start the server and then the client. I get the following output when the client starts.

Server:
before write_message()
after  write_message()

Client:
before sleep





Kevin LaTona

unread,
Sep 8, 2015, 11:51:03 AM9/8/15
to python-...@googlegroups.com

On Sep 7, 2015, at 7:37 PM, JohnMudd <john...@gmail.com> wrote:

> Any suggestion

Here is one idea and maybe some others on the list will jump in here with some other ideas or tweaks to this one.

-Kevin



# Server:

import tornado.ioloop
import tornado.web
import tornado.websocket

class WebSocketHandler(tornado.websocket.WebSocketHandler):
def open(self, *args):
print( 'open ws')

def on_message(self, message):
self.write_message(message)

def on_close(self):
print('close ws')

app = tornado.web.Application([
(r'/ws', WebSocketHandler),
])

if __name__ == '__main__':
app.listen(8000)
tornado.ioloop.IOLoop.instance().start()






# Client:

from tornado.ioloop import IOLoop
from tornado import gen
from tornado.websocket import websocket_connect

@gen.coroutine
def process_msgs():
url = 'ws://localhost:8000/ws'
web_sock = yield websocket_connect(url, None)

web_sock.write_message("hello world")
msg = yield web_sock.read_message()
print(msg)

web_sock.write_message("this is a test")
msg = yield web_sock.read_message()
print(msg)

web_sock.close(1001,"closing connection")
IOLoop.instance().stop()

if __name__ == '__main__':
io = IOLoop.instance().add_callback(process_msgs)
IOLoop.instance().start()

John Mudd

unread,
Sep 8, 2015, 2:14:48 PM9/8/15
to python-...@googlegroups.com
Thanks. But I'm try to use on_message_callback to get msgs to arrive in the background.


--
You received this message because you are subscribed to a topic in the Google Groups "Tornado Web Server" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/python-tornado/iFtV0ni8LBA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ben Darnell

unread,
Sep 8, 2015, 11:01:31 PM9/8/15
to Tornado Mailing List
On Mon, Sep 7, 2015 at 10:37 PM, JohnMudd <john...@gmail.com> wrote:
@tornado.gen.coroutine
def process_msgs():
    url = 'ws://localhost:8000/ws'
    client = tornado.httpclient.HTTPRequest(url)
    conn = yield tornado.websocket.websocket_connect(client, on_message_callback=put)

    print 'before sleep'
    time.sleep(60)
    print 'after sleep'

Never use time.sleep() in asynchronous code - it blocks the thread for the entire duration. In a coroutine, you can use `yield tornado.gen.sleep(60)` instead.

-Ben
 

tornado.ioloop.IOLoop.instance().run_sync(process_msgs)



I start the server and then the client. I get the following output when the client starts.

Server:
before write_message()
after  write_message()

Client:
before sleep





--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.

John Mudd

unread,
Sep 9, 2015, 10:09:35 AM9/9/15
to python-...@googlegroups.com
Ben, you're right, of course. Sorry, I've asked a similar question before so I have no excuse.

I solved my original problem using the ThreadPoolExecutor but it doesn't work as expected in the following example.



This works as expected:

Server:

import time
import tornado.ioloop
import tornado.web
import tornado.websocket

class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self, *args):
        self.write_message('test')

    def on_message(self, message):
        time.sleep(0.2)
        self.write_message(message)

if __name__ == '__main__':
    app = tornado.web.Application([
        (r'/ws', WebSocketHandler),
    ])

    app.listen(8000)
    tornado.ioloop.IOLoop.instance().start()


Client:

import time
import threading
from concurrent.futures import ThreadPoolExecutor

import tornado
import tornado.websocket

executor = ThreadPoolExecutor(1)

def process_msgs():
    print 'before sleep'
    time.sleep(1)
    print 'after sleep'

class Client(threading.Thread):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.start()

    def put(self, msg):
        print 'put(): msg=%s' % `msg`
        self.conn.write_message(msg)

    @tornado.gen.coroutine
    def process_msgs(self):
        url = 'ws://localhost:8000/ws'
        self.conn = yield tornado.websocket.websocket_connect(url, on_message_callback=self.put)

        yield executor.submit(process_msgs)

    def run(self):
        tornado.ioloop.IOLoop.instance().run_sync(self.process_msgs)
        print 'after tornado.ioloop.IOLoop.instance'

client = Client()


Output from client:

before sleep
 put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
after sleep
after tornado.ioloop.IOLoop.instance




The sleep() represents work being done. I really want that work to be done using a method from the Client object so I tried the following client but the process_msgs() method doesn't get executed.

Client 2:

executor = ThreadPoolExecutor(1)

class Client(threading.Thread):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.start()

    def put(self, msg):
        print 'put(): msg=%s' % `msg`
        self.conn.write_message(msg)

    def process_msgs(self):
        print 'before sleep'
        time.sleep(1)
        print 'after sleep'

    @tornado.gen.coroutine
    def process_msgs(self):
        url = 'ws://localhost:8000/ws'
        self.conn = yield tornado.websocket.websocket_connect(url, on_message_callback=self.put)

        yield executor.submit(self.process_msgs)

    def run(self):
        tornado.ioloop.IOLoop.instance().run_sync(self.process_msgs)
        print 'after tornado.ioloop.IOLoop.instance'

client = Client()


Output from client 2:

put(): msg=u'test'
after tornado.ioloop.IOLoop.instance





Here's another client that seems to give me what I want. But (1) I had to call an external function in order to call the class method from the thread pool and (2) I had to change the name of the method so that it doesn't match the function name. Seems strange to have to do either of these extra steps.

Client 3:

executor = ThreadPoolExecutor(1)

def process_msgs(client):
    print 'before client.process_msgsx'
    client.process_msgsx()
    print 'after  client.process_msgsx'

class Client(threading.Thread):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.start()

    def put(self, msg):
        print 'put(): msg=%s' % `msg`
        self.conn.write_message(msg)

    def process_msgsx(self):
        print 'before sleep'
        time.sleep(1)
        print 'after sleep'

    @tornado.gen.coroutine
    def process_msgs(self):
        url = 'ws://localhost:8000/ws'
        self.conn = yield tornado.websocket.websocket_connect(url, on_message_callback=self.put)

        #yield executor.submit(process_msgs)
        #yield executor.submit(self.process_msgs)
        yield executor.submit(process_msgs, self)

    def run(self):
        tornado.ioloop.IOLoop.instance().run_sync(self.process_msgs)
        print 'after tornado.ioloop.IOLoop.instance'

client = Client()


Output from client 3:

before client.process_msgsx
before sleep
 put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
after sleep
after  client.process_msgsx
after tornado.ioloop.IOLoop.instance




Here's what I get if I use the same name for the function and method. I changed "process_msgsx" to "process_msgs" in the class to match the function name. Now it looks like the method is not executed.

before client.process_msgs
 put(): msg=u'test'
after  client.process_msgs
after tornado.ioloop.IOLoop.instance


John

JohnMudd

unread,
Sep 9, 2015, 5:29:11 PM9/9/15
to Tornado Web Server
Sorry, it was another dumb mistake.  I was blindly using the same method name twice in the same class. Python doesn't complain, it just let me overlay the first method with confusing results.

Finally, here's the client that works as expected, the way I wanted from the start.


executor = ThreadPoolExecutor(1)

class Client(threading.Thread):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.start()

    def put(self, msg):
        print 'put(): msg=%s' % `msg`
        self.conn.write_message(msg)

    def more_processing(self):
        print 'before sleep'
        time.sleep(1)
        print 'after sleep'

    @tornado.gen.coroutine
    def process_msgs(self):
        url = 'ws://localhost:8000/ws'
        self.conn = yield tornado.websocket.websocket_connect(url, on_message_callback=self.put)

        yield executor.submit(self.more_processing)

    def run(self):
        tornado.ioloop.IOLoop.instance().run_sync(self.process_msgs)
        print 'after tornado.ioloop.IOLoop.instance'


Output:

put(): msg=u'test'
 before sleep
put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
put(): msg=u'test'
after sleep
after tornado.ioloop.IOLoop.instance

Kevin LaTona

unread,
Sep 9, 2015, 8:29:01 PM9/9/15
to python-...@googlegroups.com


John,

It's not fully clear to me what you are wanting to do here…….. any chance you could outline it?

The reason I am asking is there always more than one way to do things.

So I am curious what you ulitmate goal is as I think all of us can pick up tidbits and learn things along the way on the list here.

-Kevin


JohnMudd

unread,
Sep 9, 2015, 10:49:29 PM9/9/15
to Tornado Web Server, li...@studiosola.com
I've always thought websockets were cool idea. But I never had a real need.

Now I have a tornado HTTP server. It provides web services to web sites at our stores around the country. A few thousand stores. 

The web sites talks to my server. My server passes along requests to our old legacy system. The legacy system has a complicated, slow, inflexible connection to data stored on computers at the store. It's painful because the legacy system might take 1, 2, 3, maybe 5 seconds to satisfy a request. There's no good excuse for this but there's also no chance of fixing the legacy system. Painful to be the middleman in all this.

Recently the stores needed to send text messages. I managed to get a change where our app on the store computers now sends their text request to my HTTP server. That works well. It's also a breakthrough because now my server is only one of two servers the stores are allowed to connect to. The legacy server is the other.

So now I want to install a reverse zombie client on all stores. A background process that will connect to my server as a client but then act more like a server. I'll use that connection as an alternative to legacy system.

But first, I've never been comfortable with the websocket interface. A different interface for client & server?? That has to go. And I want to be able to send multiple overlapping requests from my server to a client over a single websocket connection. Ditto from the client to my server. And all that traffic needs to share the same connection but not interfere with each  other. For this I have the concept of cheap and easy channels on top of the websocket.

Most processing (usually database access) will run in the background on the store computer. So I'll make use of the Queue module so I get a timeout capability. 

That's roughly it. Hopefully I can share code soon.

Kevin LaTona

unread,
Sep 10, 2015, 12:28:39 AM9/10/15
to python-...@googlegroups.com, JohnMudd



Like you I've not had many reasons to use a websocket, so your email thread got me to thinking more about them.

Which is why I was trying to get a better visual of the problem.

So far Tornado solves a problem for me and works well.

So I'm always looking to poke around in and at Tornado on my off days to see what else it can do for me.

It's be intersting to see where you end at, given there are many answers / ways to solve a problem.

Thanks for sharing.
Reply all
Reply to author
Forward
0 new messages