from pulsar import Application, MultiApp, EventHandler, new_event_loop
class EventProxy(EventHandler): MANY_TIMES_EVENTS = ['test']
# I think this shouldn't be a MultiApp, but this way I can create a Wrapper easily.# How else can this be done?class Core(MultiApp): def build(self): p = EventProxy() yield self.new_app(Wrapper, event_proxy=p) ExtensionA(p) ExtensionB(p)
class Wrapper(Application): def worker_start(self, worker, exc=None): # Why can't this be in build as EventProxy(loop=...)? self.cfg.event_proxy._loop = new_event_loop() self._loop = worker._loop self._loop.call_later(1, self.test)
def test(self): for i in range(3): self.cfg.event_proxy.fire_event('test', i)
# How do I exit the program?
class ExtensionA: def __init__(self, event_proxy): event_proxy.bind_event('test', self.handle_test)
def handle_test(self, i): print('A:', i)
class ExtensionB: def __init__(self, event_proxy): event_proxy.bind_event('test', self.handle_test)
def handle_test(self, i): print('B:', i)
if __name__ == '__main__': Core().start()
Hello,
I've been playing with Pulsar for a while, but still don't really understand how I can create an application.The basic things I've written were first based on examples.snippets.greeter, now I'm using worker_start, as in pulsar.apps.socket. But still, I don't even understand how to properly stop the app.I feel like the tutorials should be more explanatory. Maybe it's because I'm new to actor thing, but the docs don't help me that much, you're my only hope.
Let's get specific.I want to write a tool that wraps a third party process using Popen, and fires events based on it (start, stop, output, etc.). Other parts of my tool will bind to those events. Those parts would be self contained, think of them as extensions to the core.
I have the feeling everything I did is wrong. Should I even be mixing Apps and Events? What's the best way to do all of this?
class MyApp(Application): def worker_start(self, worker, exc=None): self._loop.call_later(1, self.test, worker)
def test(self, worker): for i in range(3):
self.cfg.callable.handle_test(i)
# How do I exit the program?
worker.stop()
class ThirdParty:
def handle_test(self, i): ...
if __name__ == '__main__': MyApp(ThirdParty()).start()