building a reconnecting python client using asyncio

1,111 views
Skip to first unread message

David Ford

unread,
Jun 27, 2016, 3:03:25 PM6/27/16
to Crossbar
i noticed that the twisted implementation has a reconnecting client but i use asyncio. would the powers that be please guide me in the right direction as i monkey patch things?

i currently have a modified ApplicationRunner instance as i need this wamp implementation as a sub module for several already well developed projects. so i spin off a thread pool using asyncio to keep wamp communication going in the background.

here's the code i have now; nginx is my https provider which proxies for wss to crossbar.

    def app(self):
        """
        We use our own ApplicationRunner here which is almost an identical copy of
        wamp.ApplicationRunner. The difference being that we need to:
         a) explicitly get a new asyncio event loop because we aren't running
            in the main thread - we'll get a RuntimeError: There is no current
            event loop in thread <thread name>, and
         b) don't set a signal handler for SIGTERM, also because we're not
            running in the main thread
        """

        def _create():
            __cfg = ComponentConfig(self.realm, self.extra)
            try:
                session = _Component(__cfg)
                session.session_loop = loop
                self.session = session
            except Exception as e:
                print("App session could not be created! ")
                traceback.print_exc()
                asyncio.get_event_loop().stop()
            else:
                return session

        loop = asyncio.new_event_loop()
        loop.set_debug(True)
        asyncio.set_event_loop(loop)

        txaio.use_asyncio()
        txaio.start_logging(level='debug')
        txaio.config.loop = loop

        try:
            transport_factory = WampWebSocketClientFactory(_create, url=self.irl, serializers=None)
        except Exception as e:
            traceback.print_exc()

        isSecure, host, port, resource, path, params = parse_url(self.irl)


        while True:
            print('re-connecting WAMP client')
            # nginx is always successful, so there's nothing here that generates
            # an error :( our debug logging shows websocket activity that fails
            # during the opening WS handshake but that's where the websocket client
            # activity stops.

            try:
                coro = loop.create_connection(transport_factory, host, port, ssl=True)
                fut = asyncio.Task(coro, loop=loop)
                (transport, protocol) = loop.run_until_complete(asyncio.wait_for(fut, timeout=2, loop=loop))
                loop.run_forever()
            except KeyboardInterrupt:
                print('KBI')
                break
            except Exception as e:
                traceback.print_exc()
                break

            if protocol._session:
                loop.run_until_complete(protocol._session.leave())

            if self.session._shutdown:
                break

            asyncio.ensure_future(asyncio.sleep(10))


here's some debug log output of process start up where i've shut down crossbar.

┌[✓ non-root@sea-dragon [~]
└─> PYTHONASYNCIODEBUG=1 python -Wdefault -u /var/bluelabs/utils/blam.py --fore
waiting for wamp session to start
re-connecting WAMP client
2016-06-27T18:56:24 Get address info hive-poc.norsec.org:443, type=<SocketKind.SOCK_STREAM: 1>
2016-06-27T18:56:24 Getting address info hive-poc.norsec.org:443, type=<SocketKind.SOCK_STREAM: 1> took 10.966 ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('173.12.76.129', 443))]
2016-06-27T18:56:24 poll 1999.751 ms took 12.141 ms: 1 events
2016-06-27T18:56:24 connect <socket.socket fd=8, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('0.0.0.0', 0)> to ('173.12.76.129', 443)
2016-06-27T18:56:24 poll 1985.767 ms took 56.728 ms: 1 events
2016-06-27T18:56:24 <asyncio.sslproto.SSLProtocol object at 0x7f1e39b5c710> starts SSL handshake
2016-06-27T18:56:24 poll 1912.252 ms took 66.077 ms: 1 events
2016-06-27T18:56:24 poll 1845.406 ms took 5.027 ms: 1 events
2016-06-27T18:56:24 poll 1838.940 ms took 52.410 ms: 1 events
2016-06-27T18:56:24 <asyncio.sslproto.SSLProtocol object at 0x7f1e39b5c710>: SSL handshake took 128.0 ms
2016-06-27T18:56:24 
[('logOctets', False, 'WampWebSocketClientFactory'),
 ('logFrames', False, 'WampWebSocketClientFactory'),
 ('trackTimings', False, 'WampWebSocketClientFactory'),
 ('utf8validateIncoming', True, 'WampWebSocketClientFactory'),
 ('applyMask', True, 'WampWebSocketClientFactory'),
 ('maxFramePayloadSize', 0, 'WampWebSocketClientFactory'),
 ('maxMessagePayloadSize', 0, 'WampWebSocketClientFactory'),
 ('autoFragmentSize', 0, 'WampWebSocketClientFactory'),
 ('failByDrop', True, 'WampWebSocketClientFactory'),
 ('echoCloseCodeReason', False, 'WampWebSocketClientFactory'),
 ('openHandshakeTimeout', 5, 'WampWebSocketClientFactory'),
 ('closeHandshakeTimeout', 1, 'WampWebSocketClientFactory'),
 ('tcpNoDelay', True, 'WampWebSocketClientFactory'),
 ('autoPingInterval', 0, 'WampWebSocketClientFactory'),
 ('autoPingTimeout', 0, 'WampWebSocketClientFactory'),
 ('autoPingSize', 4, 'WampWebSocketClientFactory'),
 ('version', 18, 'WampWebSocketClientFactory'),
 ('acceptMaskedServerFrames', False, 'WampWebSocketClientFactory'),
 ('maskClientFrames', True, 'WampWebSocketClientFactory'),
 ('serverConnectionDropTimeout', 1, 'WampWebSocketClientFactory'),
 ('perMessageCompressionOffers', [], 'WampWebSocketClientFactory'),
 ('perMessageCompressionAccept',
  <function WebSocketClientFactory.resetProtocolOptions.<locals>.<lambda> at 0x7f1e39b4b950>,
  'WampWebSocketClientFactory')]
2016-06-27T18:56:24 connection to tcp:173.12.76.129:443 established
2016-06-27T18:56:24 GET /ws HTTP/1.1
User-Agent: AutobahnPython/0.14.1
Upgrade: WebSocket
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
Sec-WebSocket-Key: LG2hvYmQktWJKzK1mx1IJQ==
Sec-WebSocket-Protocol: wamp.2.json.batched,wamp.2.json
Sec-WebSocket-Version: 13


2016-06-27T18:56:24 <socket.socket fd=8, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('104.131.36.175', 40507), raddr=('173.12.76.129', 443)> connected to hive-poc.norsec.org:443: (<asyncio.sslproto._SSLProtocolTransport object at 0x7f1e39b5c3c8>, <autobahn.asyncio.websocket.WampWebSocketClientProtocol object at 0x7f1e427f1630>)
2016-06-27T18:56:24 poll 4724.705 ms took 60.126 ms: 1 events
2016-06-27T18:56:24 received HTTP response:

b'HTTP/1.1 502 Bad Gateway\r\nServer: nginx/1.11.1\r\nDate: Mon, 27 Jun 2016 18:56:24 GMT\r\nContent-Type: text/html\r\nContent-Length: 173\r\nConnection: keep-alive\r\n\r\n'


2016-06-27T18:56:24 received HTTP status line in opening handshake : HTTP/1.1 502 Bad Gateway
2016-06-27T18:56:24 received HTTP headers in opening handshake : {'connection': 'keep-alive', 'content-type': 'text/html', 'content-length': '173', 'server': 'nginx/1.11.1', 'date': 'Mon, 27 Jun 2016 18:56:24 GMT'}
2016-06-27T18:56:24 failing WebSocket opening handshake ('WebSocket connection upgrade failed (502 - BadGateway)')
2016-06-27T18:56:24 dropping connection: WebSocket connection upgrade failed (502 - BadGateway)
2016-06-27T18:56:24 poll 4658.637 ms took 60.055 ms: 1 events
2016-06-27T18:56:24 <_SelectorSocketTransport fd=8 read=polling write=<idle, bufsize=0>> received EOF
2016-06-27T18:56:24 <asyncio.sslproto.SSLProtocol object at 0x7f1e39b5c710> received EOF
2016-06-27T18:56:24 _connectionLost: None
waiting for wamp session to start
waiting for wamp session to start
waiting for wamp session to start
waiting for wamp session to start
2016-06-27T18:56:29 poll 4593.100 ms took 4598.847 ms: timeout
2016-06-27T18:56:29 skipping opening handshake timeout: WebSocket connection is already closed
waiting for wamp session to start
waiting for wamp session to start
waiting for wamp session to start


as the code i write will be aimed at folding it back into autobahn-python, i'd love to have guidance in where you guys would rather that i change things.

-david

David Ford

unread,
Jun 30, 2016, 12:39:48 AM6/30/16
to Crossbar
ok, i've built a reliable re-connecting client but it does so in a fairly blunt way. if the connection doesn't succeed within a given time period, the event loop is simply stopped and restarted. this is a pretty ugly way to achieve it and there's no feedback as to why the connection failed -- but it works. by turning on debug logging we can see it fail, but that error dies silently deep inside the autobahn code -- the 502-bad gateway mentioned above. it'd be much better if the websocket coroutine managed to get that back to the event loop by way of an exception

Alexander Gödde

unread,
Jun 30, 2016, 6:11:16 AM6/30/16
to Crossbar
HI David!

Reconnect for Autobahn|Python is an open issue generally. Maybe you could help drive this forward by joining the discussion at https://github.com/crossbario/autobahn-python/issues/588?

Regards,

Alex

David Ford

unread,
Jun 30, 2016, 3:28:49 PM6/30/16
to cross...@googlegroups.com
thank you. i've joined it and i'll try my best to stimulate some focus on the issue :)

--
You received this message because you are subscribed to the Google Groups "Crossbar" group.
To unsubscribe from this group and stop receiving emails from it, send an email to crossbario+...@googlegroups.com.
To post to this group, send email to cross...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/crossbario/7548c5fa-599d-422d-b624-a61e68609c98%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Gay/FireRescue/Geek in 33484, USA
It's the ideals of Linux and Open Source that are amazing, it embodies what WE want, not what is marketed
Reply all
Reply to author
Forward
0 new messages