How to communicate between apps pulsar, for example, through asyncio.Queue?

78 views
Skip to first unread message

Sergey Pikhovkin

unread,
Dec 19, 2016, 7:01:03 AM12/19/16
to python-pulsar
How to communicate between apps pulsar, for example, through asyncio.Queue?

The problem is that when the application starts somewhere inside the lost link to the asyncio.Queue object.

Prototype of uses:

import asyncio
import pulsar


class App1(pulsar.Application):
    queue
= None

    async
def watch_queue(self, worker):
        app
= worker.app
       
while 1:
            app
.queue.put_nowait({'id': 1})  # Here app.queue is None

            await asyncio
.sleep(1, loop=worker._loop)

   
def worker_start(self, worker, exc=None):
        asyncio
.ensure_future(self.watch_queue(worker), loop=worker._loop)


class App2(pulsar.Application):
    queue
= None

    async
def watch_queue(self, worker):
        app
= worker.app
        app_queue
= app.queue
       
while 1:
            msg
= app_queue.get()  # Here app.queue is None

           
if not msg:
                await asyncio
.sleep(1, loop=worker._loop)
               
continue

           
print(msg)

   
def worker_start(self, worker, exc=None):
        asyncio
.ensure_future(self.watch_queue(worker), loop=worker._loop)


class Server(pulsar.MultiApp):
   
def build(self):
       
App1.queue = asyncio.Queue()
       
App2.queue = App1.queue

       
yield self.new_app(App1)
       
yield self.new_app(App2, prefix='app2')


async
def run():
    server
= Server()
    server
.start()


lsbardel

unread,
Dec 20, 2016, 5:39:00 AM12/20/16
to python-pulsar
Hi,


On Monday, December 19, 2016 at 12:01:03 PM UTC, Sergey Pikhovkin wrote:
How to communicate between apps pulsar, for example, through asyncio.Queue?

You can't do it in the general case. Pulsar actors (workers) don't share state between them.
Depending on the concurrency model, they may be on different threads or processes and therefore sharing an asyncio.Queue is not possible.
It would be possible between actors running on the same thread and therefore sharing the event loop.

Communication between actors can be done via pulsar message passing (http://quantmind.github.io/pulsar/tutorials/messages.html), pulsar data store or third party systems.
It really depends what you have in mind.
An example of actors sharing a distributed queue is in the pulsar-queue module

Reply all
Reply to author
Forward
0 new messages