from flask import Flask, make_response
from pulsar.apps import wsgi
import asyncio
from functools import partial
import pulsar
from pulsar import Pool, Connection, AbstractClient, ProtocolError
from pulsar.apps.socket import SocketServer
from pulsar import MultiApp, Config
from pulsar.apps.wsgi import WSGIServer
class EchoProtocol(pulsar.ProtocolConsumer):
'''An echo :class:`~.ProtocolConsumer` for client and servers.
The only difference between client and server is the implementation
of the :meth:`response` method.
'''
separator = b'\r\n\r\n'
'''A separator for messages.'''
buffer = b''
'''The buffer for long messages'''
def data_received(self, data):
'''Implements the :meth:`~.ProtocolConsumer.data_received` method.
It simply search for the :attr:`separator` and, if found, it invokes
the :meth:`response` method with the value of the message.
'''
if self.buffer:
data = self.buffer + data
idx = data.find(self.separator)
if idx >= 0: # we have a full message
idx += len(self.separator)
data, rest = data[:idx], data[idx:]
self.buffer = self.response(data, rest)
self.finished()
return rest
else:
self.buffer = data
def start_request(self):
'''Override :meth:`~.ProtocolConsumer.start_request` to write
the message ended by the :attr:`separator` into the transport.
'''
self.transport.write(self._request + self.separator)
def response(self, data, rest):
'''Clients return the message so that the
:attr:`.ProtocolConsumer.on_finished` is called back with the
message value, while servers sends the message back to the client.
'''
if rest:
raise ProtocolError
return data[:-len(self.separator)]
class EchoServerProtocol(EchoProtocol):
'''The :class:`EchoProtocol` used by the echo :func:`server`.
'''
def response(self, data, rest):
'''Override :meth:`~EchoProtocol.response` method by writing the
``data`` received back to the client.
'''
self.transport.write(data)
data = data[:-len(self.separator)]
# If we get a QUIT message, close the transport.
# Used by the test suite.
if data == b'QUIT':
self.transport.close()
return data
class Echo(AbstractClient):
'''A client for the echo server.
:param address: set the :attr:`address` attribute
:param full_response: set the :attr:`full_response` attribute
:param pool_size: used when initialising the connetion :attr:`pool`.
:param loop: Optional event loop to set the :attr:`_loop` attribute.
.. attribute:: _loop
The event loop used by the client IO requests.
The event loop is stored at this attribute so that asynchronous
method decorators such as :func:`.task` can be used.
.. attribute:: address
remote server TCP address.
.. attribute:: pool
Asynchronous connection :class:`.Pool`.
.. attribute:: full_response
Flag indicating if the callable method should return the
:class:`EchoProtocol` handling the request (``True``) or
the server response message (``False``).
Default: ``False``
'''
protocol_factory = partial(Connection, EchoProtocol)
def __init__(self, address, full_response=False, pool_size=2000, loop=None):
super().__init__(loop)
self.address = address
self.full_response = full_response
self.pool = Pool(self.connect, pool_size, self._loop)
def connect(self):
return self.create_connection(self.address)
def __call__(self, message):
'''Send a ``message`` to the server and wait for a response.
:return: a :class:`.Future`
'''
result = self._call(message)
if not self._loop.is_running():
return self._loop.run_until_complete(result)
else:
return result
@asyncio.coroutine
def _call(self, message):
connection = yield from self.pool.connect()
with connection:
consumer = connection.current_consumer()
consumer.start(message)
result = yield from consumer.on_finished
result = consumer if self.full_response else consumer.buffer
return result
def FlaskApp():
app = Flask(__name__)
@app.errorhandler(404)
def not_found(e):
return make_response("404 Page", 404)
@app.route('/', methods=['GET'])
def add_org():
echo = Echo(('localhost',8060))
a = echo(b'1000')
return "Flask Example %s" % a
return app
class Site(wsgi.LazyWsgi):
def setup(self, environ=None):
app = FlaskApp()
return wsgi.WsgiHandler((wsgi.wait_for_body_middleware,
wsgi.middleware_in_executor(app)),
async=True)
def server(**kwargs):
return wsgi.WSGIServer(Site(), **kwargs)
class Server(MultiApp):
cfg = Config(bind=':0', rpc_bind=':0', bla='foo')
def build(self):
yield self.new_app(WSGIServer, callable=Site())
yield self.new_app(SocketServer, 'echo', callable=EchoServerProtocol)
if __name__ == '__main__': # pragma nocover
#server(bind='127.0.0.1:8080').start()
app = Server()
app.start()