can i communicate between two applications?

71 views
Skip to first unread message

Bright Pan

unread,
Mar 16, 2016, 9:04:48 AM3/16/16
to python-pulsar
i create two applications in MutiApp:

i visit http://127.0.0.1:8080 for flask application, can i visit echo server from flask server? i test it ,but failure, why?

from flask import Flask, make_response

from pulsar.apps import wsgi

import asyncio
from functools import partial

import pulsar
from pulsar import Pool, Connection, AbstractClient, ProtocolError
from pulsar.apps.socket import SocketServer

from pulsar import MultiApp, Config
from pulsar.apps.wsgi import WSGIServer

class EchoProtocol(pulsar.ProtocolConsumer):
   
'''An echo :class:`~.ProtocolConsumer` for client and servers.

    The only difference between client and server is the implementation
    of the :meth:`response` method.
    '''
    separator = b'\r\n\r\n'
    '''A separator for messages.'''
    buffer = b''
    '''The buffer for long messages'''

    def data_received(self, data):
       
'''Implements the :meth:`~.ProtocolConsumer.data_received` method.

        It simply search for the :attr:`separator` and, if found, it invokes
        the :meth:`response` method with the value of the message.
        '''
        if self.buffer:
            data
= self.buffer + data

        idx
= data.find(self.separator)
       
if idx >= 0:    # we have a full message
            idx += len(self.separator)
            data
, rest = data[:idx], data[idx:]
           
self.buffer = self.response(data, rest)
           
self.finished()
           
return rest
       
else:
           
self.buffer = data

   
def start_request(self):
       
'''Override :meth:`~.ProtocolConsumer.start_request` to write
        the message ended by the :attr:`separator` into the transport.
        '''
        self.transport.write(self._request + self.separator)

   
def response(self, data, rest):
       
'''Clients return the message so that the
        :attr:`.ProtocolConsumer.on_finished` is called back with the
        message value, while servers sends the message back to the client.
        '''
        if rest:
           
raise ProtocolError
       
return data[:-len(self.separator)]


class EchoServerProtocol(EchoProtocol):
   
'''The :class:`EchoProtocol` used by the echo :func:`server`.
    '''
    def response(self, data, rest):
       
'''Override :meth:`~EchoProtocol.response` method by writing the
        ``data`` received back to the client.
        '''
        self.transport.write(data)
        data
= data[:-len(self.separator)]
       
# If we get a QUIT message, close the transport.
        # Used by the test suite.
        if data == b'QUIT':
           
self.transport.close()
       
return data


class Echo(AbstractClient):
   
'''A client for the echo server.

    :param address: set the :attr:`address` attribute
    :param full_response: set the :attr:`full_response` attribute
    :param pool_size: used when initialising the connetion :attr:`pool`.
    :param loop: Optional event loop to set the :attr:`_loop` attribute.

    .. attribute:: _loop

        The event loop used by the client IO requests.

        The event loop is stored at this attribute so that asynchronous
        method decorators such as :func:`.task` can be used.

    .. attribute:: address

        remote server TCP address.

    .. attribute:: pool

        Asynchronous connection :class:`.Pool`.

    .. attribute:: full_response

        Flag indicating if the callable method should return the
        :class:`EchoProtocol` handling the request (``True``) or
        the server response message (``False``).

        Default: ``False``
    '''
    protocol_factory = partial(Connection, EchoProtocol)

   
def __init__(self, address, full_response=False, pool_size=2000, loop=None):
       
super().__init__(loop)
       
self.address = address
       
self.full_response = full_response
       
self.pool = Pool(self.connect, pool_size, self._loop)

   
def connect(self):
       
return self.create_connection(self.address)

   
def __call__(self, message):
       
'''Send a ``message`` to the server and wait for a response.

        :return: a :class:`.Future`
        '''
        result = self._call(message)
       
if not self._loop.is_running():
           
return self._loop.run_until_complete(result)
       
else:
           
return result

   
@asyncio.coroutine
    def _call(self, message):
        connection
= yield from self.pool.connect()
       
with connection:
            consumer
= connection.current_consumer()
            consumer
.start(message)
           
result = yield from consumer.on_finished
            result
= consumer if self.full_response else consumer.buffer
           
return result


def FlaskApp():
    app
= Flask(__name__)

   
@app.errorhandler(404)
   
def not_found(e):
       
return make_response("404 Page", 404)

   
@app.route('/', methods=['GET'])
   
def add_org():

        echo
= Echo(('localhost',8060))
        a
= echo(b'1000')
       
return "Flask Example %s" %
a

   
return app


class Site(wsgi.LazyWsgi):

   
def setup(self, environ=None):
        app
= FlaskApp()
       
return wsgi.WsgiHandler((wsgi.wait_for_body_middleware,
                                 wsgi
.middleware_in_executor(app)),
                               
async=True)


def server(**kwargs):
   
return wsgi.WSGIServer(Site(), **kwargs)



class Server(MultiApp):
    cfg
= Config(bind=':0', rpc_bind=':0', bla='foo')

   
def build(self):
       
yield self.new_app(WSGIServer, callable=Site())
       
yield self.new_app(SocketServer, 'echo', callable=EchoServerProtocol)



if __name__ == '__main__':  # pragma    nocover
    #server(bind='127.0.0.1:8080').start()
    app = Server()
    app
.start()




lsbardel

unread,
Mar 17, 2016, 4:56:02 AM3/17/16
to python-pulsar
Hi


On Wednesday, March 16, 2016 at 1:04:48 PM UTC, Bright Pan wrote:
i create two applications in MutiApp:

There are several issues with your example:

* Flask design is synchronous and therefore you cannot use an asynchronous component such as the Echo client inside a flask response function
* Mixing and matching threads (via the ``middleware_in_executor``) and asynchronous components such as the Echo client is not a good idea (the event loop is thread local)
* The multiapp Config object is wrong, you need to configure the bind parameters

There is a way to use flask, or any other synchronous web framework, with asynchronous components such as the Echo client in the example, inside the response function. It requires the use of greenlet library and pulsar.app.greenio module.

I've pushed an example of pulsar master with a configuration similar to yours


enjoy it!

Bright Pan

unread,
Mar 17, 2016, 6:28:54 AM3/17/16
to python-pulsar
Hi,
在 2016年3月17日星期四 UTC+8下午4:56:02,lsbardel写道:
This is perfect answer for me,  i just begin to use pulsar recently,
and i don't understand a lot of concept in pulsar, 
for example: Connection class, what is the design ideas at the beginning?
do you have any good suggestions for using pulsar?

thank you!!!
Reply all
Reply to author
Forward
0 new messages