how can i bind 'stop' event for coroutine function, for close redis connection.
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.arbiter] mailbox serving on 127.0.0.1:60282
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.arbiter] started
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.data] data serving on 127.0.0.1:8060
16:53:42 [p=5975, t=140736694936512, INFO, pulsar.data] started
16:53:42 [p=5978, t=140736694936512, INFO, pulsar.data.worker] data serving on 127.0.0.1:8060
16:53:42 [p=5979, t=140736694936512, INFO, pulsar.data.worker] data serving on 127.0.0.1:8060
16:53:42 [p=5978, t=140736694936512, INFO, pulsar.data.worker] started
16:53:42 [p=5979, t=140736694936512, INFO, pulsar.data.worker] started
16:53:42 [p=5978, t=140736694936512, INFO, pulsar.data.worker] redis://localhost:6379 setup
16:53:42 [p=5979, t=140736694936512, INFO, pulsar.data.worker] redis://localhost:6379 setup
16:53:44 [p=5975, t=140736694936512, WARNING, pulsar.arbiter] got INT - stopping
16:53:44 [p=5979, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5978, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5979, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5978, t=140736694936512, WARNING, pulsar.data.worker] got INT - stopping
16:53:44 [p=5979, t=140736694936512, INFO, pulsar.data.worker] Bye from "data.worker.i1d1bdff"
16:53:44 [p=5978, t=140736694936512, INFO, pulsar.data.worker] Bye from "data.worker.i5a20be3"
class DataSocketServer(SocketServer, RedisEventMinix):
name = 'data'
async def worker_start(self, worker, exc=None):
'''Start the worker by invoking the :meth:`create_server` method.
'''
if not exc and self.name not in worker.servers:
server = await self.create_server(worker)
server.bind_event('start', self.redis_setup_cb)
server.bind_event('stop', self.redis_close_cb)
server.bind_event('stop', lambda _, **kw: worker.stop())
worker.servers[self.name] = server
class RedisEventMinix():
def redis_config(self):
config = self.cfg.params['redis']
host = config['host']
port = config['port']
minsize = config['minsize']
maxsize = config['maxsize']
return (host, port, minsize, maxsize)
async def redis_setup_cb(self, arg, exc=None, **kwargs):
if not exc:
host, port, minsize, maxsize = self.redis_config()
pool = await aioredis.create_pool((host, port),
minsize=minsize,
maxsize=maxsize)
producer = arg
producer.redis_pool = pool
self.logger.info('redis://{}:{} setup'.format(host,
port))
async def redis_close_cb(self, arg, exc=None, **kwargs):
if not exc:
host, port, minsize, maxsize = self.redis_config()
producer = arg
producer.redis_pool.close()
await producer.redis_pool.wait_closed()
self.logger.info('redis://{}:{} close'.format(host,
port))