Pulsar interoperability with Flask and Twisted

336 views
Skip to first unread message

Anatoly Akkerman

unread,
Dec 18, 2013, 9:40:11 AM12/18/13
to python...@googlegroups.com
I have a simple Flask RESTful app, which provides a websocket interface for clients that just want to receive updates w/o polling. My initial implementation is using Twisted for WSGI container for my Flask app and Twisted + Autobahn for websocket support.

I am interested in using Pulsar as its support for asynchronous and actor-based programming appeals more to me than callback-based development.
So, I've tried the following approaches:
  • Use Pulsar Twisted integration, which attempts to replace Twisted's reactor implementation with Pulsar's eventloop. Presumably, because of me not understanding how to do it correctly, it never worked. An example on how to actually do the reactor replacement would have been very useful.
  • Next I thought of replacing Twisted completely with Pulsar (rewriting the Twisted+Autobahn websocket with Pulsar websocket APIs), however, I can't figure out how to have a _synchronous_ WSGI app (Flask) and asynchronous websocket handler in the same WSGIServer. Is it possible?

Any help would be appreciated

Anatoly.

lsbardel

unread,
Dec 18, 2013, 10:58:46 AM12/18/13
to python...@googlegroups.com
On Wednesday, December 18, 2013 2:40:11 PM UTC, Anatoly Akkerman wrote:
I have a simple Flask RESTful app, which provides a websocket interface for clients that just want to receive updates w/o polling. My initial implementation is using Twisted for WSGI container for my Flask app and Twisted + Autobahn for websocket support.

I am interested in using Pulsar as its support for asynchronous and actor-based programming appeals more to me than callback-based development.
So, I've tried the following approaches:
  • Use Pulsar Twisted integration, which attempts to replace Twisted's reactor implementation with Pulsar's eventloop. Presumably, because of me not understanding how to do it correctly, it never worked. An example on how to actually do the reactor replacement would have been very useful.
Hi,
pulsar-twisted integration is a prof-of-concept application and not fully tested. I don't use twisted therefore I have little incentive in polishing it.
Having said that the reactor is replaced by importing twisted via pulsar

    from pulsar.apps.tx import twisted

before you import twisted at all.
  • Next I thought of replacing Twisted completely with Pulsar (rewriting the Twisted+Autobahn websocket with Pulsar websocket APIs), however, I can't figure out how to have a _synchronous_ WSGI app (Flask) and asynchronous websocket handler in the same WSGIServer. Is it possible?
This is the best and simpler solution in my opinion. I'm not familiar with flask, however pulsar ships with an application for running django on pulsar wsgi server. There is also a django example which illustrates how to integrate pulsar websocket middleware into django framework with relatively little effort.

In your case, these are the steps you may want to follow:

1) Write a flask extension to serve flask sites via pulsar. Check the ``pulsar.apps.pulse`` application to check how that is done for django

2) Write the websocket application. Check how the django chat example is implemented

Anatoly Akkerman

unread,
Dec 18, 2013, 11:06:20 AM12/18/13
to python...@googlegroups.com


On Wednesday, December 18, 2013 10:58:46 AM UTC-5, lsbardel wrote:
On Wednesday, December 18, 2013 2:40:11 PM UTC, Anatoly Akkerman wrote:
I have a simple Flask RESTful app, which provides a websocket interface for clients that just want to receive updates w/o polling. My initial implementation is using Twisted for WSGI container for my Flask app and Twisted + Autobahn for websocket support.

I am interested in using Pulsar as its support for asynchronous and actor-based programming appeals more to me than callback-based development.
So, I've tried the following approaches:
  • Use Pulsar Twisted integration, which attempts to replace Twisted's reactor implementation with Pulsar's eventloop. Presumably, because of me not understanding how to do it correctly, it never worked. An example on how to actually do the reactor replacement would have been very useful.
Hi,
pulsar-twisted integration is a prof-of-concept application and not fully tested. I don't use twisted therefore I have little incentive in polishing it.
Having said that the reactor is replaced by importing twisted via pulsar

    from pulsar.apps.tx import twisted

Thank you for a prompt reply. Here is what I am running into with Twisted:

2013-12-18 10:42:40 [p=10567,t=139987476825840] [ERROR] [pulsar.arbiter] Unhadled exception in event loop callback.
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/pulsar-0.8.0_alpha.1-py2.7-linux-x86_64.egg/pulsar/async/eventloop.py", line 679, in _run_once
    value = callback()
  File "/usr/lib/python2.7/site-packages/pulsar-0.8.0_alpha.1-py2.7-linux-x86_64.egg/pulsar/async/pollers.py", line 154, in handle_events
    reader()
  File "/usr/lib/python2.7/site-packages/pulsar-0.8.0_alpha.1-py2.7-linux-x86_64.egg/pulsar/async/eventloop.py", line 142, in __call__
    return self._callback(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/internet/tcp.py", line 215, in doRead
    return self._dataReceived(data)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/internet/tcp.py", line 221, in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/protocols/basic.py", line 571, in dataReceived
    why = self.lineReceived(line)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/web/http.py", line 1620, in lineReceived
    self.resetTimeout()
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/protocols/policies.py", line 693, in resetTimeout
    self.__timeoutCall.reset(self.timeOut)
AttributeError: 'TimedCall' object has no attribute 'reset'

lsbardel

unread,
Dec 18, 2013, 11:37:26 AM12/18/13
to python...@googlegroups.com

pulsar-twisted integration is a prof-of-concept application and not fully tested. I don't use twisted therefore I have little incentive in polishing it.
Having said that the reactor is replaced by importing twisted via pulsar

    from pulsar.apps.tx import twisted

Thank you for a prompt reply. Here is what I am running into with Twisted:

2013-12-18 10:42:40 [p=10567,t=139987476825840] [ERROR] [pulsar.arbiter] Unhadled exception in event loop callback.
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/pulsar-0.8.0_alpha.1-py2.7-linux-x86_64.egg/pulsar/async/eventloop.py", line 679, in _run_once
    value = callback()
  File "/usr/lib/python2.7/site-packages/pulsar-0.8.0_alpha.1-py2.7-linux-x86_64.egg/pulsar/async/pollers.py", line 154, in handle_events
    reader()
  File "/usr/lib/python2.7/site-packages/pulsar-0.8.0_alpha.1-py2.7-linux-x86_64.egg/pulsar/async/eventloop.py", line 142, in __call__
    return self._callback(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/internet/tcp.py", line 215, in doRead
    return self._dataReceived(data)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/internet/tcp.py", line 221, in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/protocols/basic.py", line 571, in dataReceived
    why = self.lineReceived(line)
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/web/http.py", line 1620, in lineReceived
    self.resetTimeout()
  File "/usr/lib/python2.7/site-packages/Twisted-13.2.0-py2.7-linux-x86_64.egg/twisted/protocols/policies.py", line 693, in resetTimeout
    self.__timeoutCall.reset(self.timeOut)
AttributeError: 'TimedCall' object has no attribute 'reset'

Right, this is caused by twisted trying to cancel a pulsar a TimedCall (pulsar internals). I've never tested the integration with twisted web I'm afraid.

Anatoly Akkerman

unread,
Dec 19, 2013, 2:22:22 PM12/19/13
to python...@googlegroups.com
In your case, these are the steps you may want to follow:

1) Write a flask extension to serve flask sites via pulsar. Check the ``pulsar.apps.pulse`` application to check how that is done for django

Ok, so my understanding is, that I need to use  WSGIServer as the Application and give it a LazyWsgi subclass which would instantiate a WsgiHandler which uses wait_for_body_middleware and a reference to my Flask wsgi app, ala:

        return WsgiHandler((wait_for_body_middleware, app))


2) Write the websocket application. Check how the django chat example is implemented


Now what I don't quite grasp is how does the websocket application and Flask wsgi app communicate with the backing store (which I was intending to run as yet another Application). I understand that I can use a MultiApp, but I don't follow how the two front ends (ws and wsgi-flask) will communicate with a single backend.

Thanks again! 

 

Anatoly Akkerman

unread,
Dec 19, 2013, 2:41:43 PM12/19/13
to python...@googlegroups.com
To elaborate a little, let's imagine the backend is an in-memory shared dict(). The Flask RESTful API+webapp allows getting/setting entries, while the websocket app publishes updates to all clients. How does one write this on top of pulsar?

Anatoly Akkerman

unread,
Dec 19, 2013, 2:53:35 PM12/19/13
to python...@googlegroups.com
Allright, it looks like in 0.8 branch there is a Store functionality that should be used as the backend and PubSub to communicate with the Store? Am I on the right track?

lsbardel

unread,
Dec 19, 2013, 4:05:54 PM12/19/13
to python...@googlegroups.com

Ok, so my understanding is, that I need to use  WSGIServer as the Application and give it a LazyWsgi subclass which would instantiate a WsgiHandler which uses wait_for_body_middleware and a reference to my Flask wsgi app, ala:

        return WsgiHandler((wait_for_body_middleware, app))

Yes, that is correct
 


2) Write the websocket application. Check how the django chat example is implemented


Now what I don't quite grasp is how does the websocket application and Flask wsgi app communicate with the backing store (which I was intending to run as yet another Application). I understand that I can use a MultiApp, but I don't follow how the two front ends (ws and wsgi-flask) will communicate with a single backend.

As you noticed the 0.8 series will have a proper store API, the 0.7 doesn't, however it has pubsub application.
Pseudo code:

from pulsar.apps import ws, pubsub

class Client(pubsub.Client):

    def __init__(self, ws):
        self.ws = ws

    def __call__(self, channel, message):
        self.ws.write(message)


class WsHandle(ws.WS):

    _pubsub = None

    def pubsub(self):
        if not self._pubsub:
            self._pubsub = pubsub.PubSub()
            self._pubsub.subscribe('channel1', 'channel2', ...)
        return self._pubsub

    def on_open(self, websocket):
        self.pubsub().add_client(Client(websocket))


wsgi = WsgiHandler((wait_for_body_middleware, ws.WebSocket('/wspath', WsHandle()), app))
 
anywhere in your code, even in a different process domain, you can use a pubsub handler to publish messages

p = pubsub.Pubsub()
p.publish('channel1', 'Hello there')


The a pubsub handler allows you to send messages to a centralised message handler.

lsbardel

unread,
Dec 19, 2013, 4:14:48 PM12/19/13
to python...@googlegroups.com

On Thursday, December 19, 2013 7:53:35 PM UTC, Anatoly Akkerman wrote:
Allright, it looks like in 0.8 branch there is a Store functionality that should be used as the backend and PubSub to communicate with the Store? Am I on the right track?

You are correct, 0.8 will have a much better support for this type of stuff!
It has an internal redis-like server that you can use for cache, pubsub or even store your data (it has persistence).
You can also replace it with real redis via the ``data-store`` flag or implement your own datastore.

I hope to release 0.8 very soon, I'm still working on tests passage.
Check the django chat example in the 0.8 branch for implementation, it should be self explanatory.


Reply all
Reply to author
Forward
0 new messages