how can i bind 'stop' event for coroutine function

15 views
Skip to first unread message

Bright Pan

unread,
Mar 15, 2017, 5:31:56 AM3/15/17
to python-pulsar
hi,

how can i bind 'stop' event for coroutine function, for close redis connection.
but, the redis stop coroutine function was not running,why?

16:53:42 [p=5975, t=140736694936512, INFO, pulsar.arbiter] mailbox serving on 127.0.0.1:60282
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.arbiter] started
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.data] data serving on 127.0.0.1:8060
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.data] started
16:53:42 [p=5978, t=140736694936512, INFO, pulsar.data.worker] data serving on 127.0.0.1:8060
16:53:42 [p=5979, t=140736694936512, INFO, pulsar.data.worker] data serving on 127.0.0.1:8060
16:53:42 [p=5978, t=140736694936512, INFO, pulsar.data.worker] started
16:53:42 [p=5979, t=140736694936512, INFO, pulsar.data.worker] started
16:53:42 [p=5978, t=140736694936512, INFO, pulsar.data.worker] redis://localhost:6379 setup
16:53:42 [p=5979, t=140736694936512, INFO, pulsar.data.worker] redis://localhost:6379 setup
16:53:44 [p=5975, t=140736694936512, WARNING, pulsar.arbiter] got INT - stopping
16:53:44 [p=5979, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5978, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5979, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5978, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5979, t=140736694936512, INFO, pulsar.data.worker] Bye from "data.worker.i1d1bdff"
16:53:44 [p=5978, t=140736694936512, INFO, pulsar.data.worker] Bye from "data.worker.i5a20be3"


class DataSocketServer(SocketServer, RedisEventMinix):

name =
'data'

async def worker_start(self, worker, exc=None):
'''Start the worker by invoking the :meth:`create_server` method.
'''
if not exc and self.name not in worker.servers:
server =
await self.create_server(worker)
server.bind_event('start', self.redis_setup_cb)
server.bind_event('stop', self.redis_close_cb)


server.bind_event(
'stop', lambda _, **kw: worker.stop())
worker.servers[
self.name] = server



class RedisEventMinix():

def redis_config(self):
config = self.cfg.params['redis']
host = config['host']
port = config['port']
minsize = config['minsize']
maxsize = config['maxsize']

return (host, port, minsize, maxsize)

async def redis_setup_cb(self, arg, exc=None, **kwargs):
if not exc:
host, port, minsize, maxsize = self.redis_config()

pool = await aioredis.create_pool((host, port),
minsize=minsize,
maxsize=maxsize)

producer = arg
producer.redis_pool = pool

self.logger.info('redis://{}:{} setup'.format(host,
port))

async def redis_close_cb(self, arg, exc=None, **kwargs):
if not exc:
host, port, minsize, maxsize = self.redis_config()
producer = arg
producer.redis_pool.close()
await producer.redis_pool.wait_closed()
self.logger.info('redis://{}:{} close'.format(host,
port))

Bright Pan

unread,
Mar 15, 2017, 10:27:08 AM3/15/17
to python-pulsar
i fix it,coroutine is not switch, in fire('stop')

async def worker_stopping(self, worker, exc=None):
server = worker.servers.get(self.name)
if server:
await server.close()
await asyncio.sleep(1)# enable stop event coroutine switch
close = getattr(self.cfg.callable, 'close', None)
if hasattr(close, '__call__'):
try:
await as_coroutine(close())
except Exception:
pass


在 2017年3月15日星期三 UTC+8下午5:31:56,Bright Pan写道:
Reply all
Reply to author
Forward
0 new messages