所以现在我已经完全掌握了asyncio,那下一步呢?

244 views
Skip to first unread message

Mengyang Li

unread,
Feb 10, 2020, 8:53:53 PM2/10/20
to pyth...@googlegroups.com
现在有个需求是实现一个特定的protocol 的socket 服务器,比如echo server


async
def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") await my_heavy_CPU_func() writer.write(data) await writer.drain() print("Close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() asyncio.run(main())
my_heavy_CPU_func 是一个超级吃cpu的运算,怎样才能把asyncio和多进程结合起来呢?如果是普通的http服务,gunicorn的pre-fork多进程可以满足,可是对于普通的socket服务,有没有现成的库或者实现思路呢?还是我就得造轮子?

--
Best regards,
ᶘ ᵒᴥᵒᶅ

vicalloy

unread,
Feb 10, 2020, 9:34:54 PM2/10/20
to pyth...@googlegroups.com
直接用标准库的进程相关模块 
multiprocessing.Process、multiprocessing.Semaphore

--
邮件来自: `CPyUG`华蟒用户组(中文Python技术邮件列表)
规则: http://code.google.com/p/cpyug/wiki/PythonCn
详情: http://code.google.com/p/cpyug/wiki/CpyUg
严正: 理解列表! 智慧提问! http://wiki.woodpecker.org.cn/moin/AskForHelp
---
您收到此邮件是因为您订阅了Google网上论坛上的“python-cn(华蟒用户组,CPyUG 邮件列表)”群组。
要退订此群组并停止接收此群组的电子邮件,请发送电子邮件到python-cn+...@googlegroups.com
要在网络上查看此讨论,请访问https://groups.google.com/d/msgid/python-cn/CADq5vF7GD%3D2vhpE7qe%2BcRVdsWE%3DDa%2BzbY2DhJuamQDC_etv%3DdQ%40mail.gmail.com


--

laike9m

unread,
Feb 10, 2020, 10:39:12 PM2/10/20
to pyth...@googlegroups.com
Future 是 awaitable 的,直接用 concurrent.futures.ProcessPoolExecutor 即可。


Jing Liu

unread,
Feb 10, 2020, 11:21:01 PM2/10/20
to python-cn:CPyUG
如果按你所说“my_heavy_CPU_func 是一个超级吃cpu的运算”,我感觉用 async 并没有带来什么好处。
系统自带的 ForkingTCPServer 可以满足你的要求吗?

https://docs.python.org/3/library/socketserver.html#socketserver.ForkingTCPServer

Mengyang Li

unread,
Feb 10, 2020, 11:46:27 PM2/10/20
to pyth...@googlegroups.com
目前的TCP服务可以抽象成大概50byte请求,然后服务器返回大概16K左右的数据,每个请求都很短,但是connection比较多,希望可以单进程async跑满后,开多进程

如果单纯使用ForkingTCPServer,那么需要维护大概很多进程,内存占用会非常高吧。

纠正一下,my_heavy_CPU_func是一个比较吃cpu/io的函数,这个服务期待的效果是单核支持300左右的connection,想改成多进程每个进程async,不知道如何写合适


您收到此邮件是因为您订阅了 Google 网上论坛的“python-cn(华蟒用户组,CPyUG 邮件列表)”群组。
要退订此群组并停止接收此群组的电子邮件,请发送电子邮件到python-cn+...@googlegroups.com
要在网络上查看此讨论,请访问 https://groups.google.com/d/msgid/python-cn/CANMCpS7DvKOTMX-fMcX0sLp-UKX%2B%3D4RMpHVLNuU-pFE5SmbCSg%40mail.gmail.com

Hung-I Wang

unread,
Feb 11, 2020, 1:56:30 AM2/11/20
to pyth...@googlegroups.com
如 laike9m 所述,ProcessExecutor 较为合适。唯 conncurrent.futures 里的 Future 和 asyncio 里的 Future 不同,似乎不能直接在 asyncio event loop 里 await;直接以 loop.run_in_executor() 即可。

Mengyang Li

unread,
Feb 11, 2020, 1:52:06 PM2/11/20
to pyth...@googlegroups.com
我的想法是对于每个ProcessExecutor起一个loop,把
await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)
里面的handle_echo放到ProcessExecutor 的loop里面跑,这个可行么?
相当于要在主进程传递一个fd给子进程……


kingname

unread,
Mar 3, 2020, 11:19:57 PM3/3/20
to python-cn(华蟒用户组,CPyUG 邮件列表)
你这个需求非常简单,为此我还写了一篇微信公众号来告诉你如何解决这个问题:

kingname

unread,
Mar 3, 2020, 11:20:56 PM3/3/20
to python-cn(华蟒用户组,CPyUG 邮件列表)
把文章里面的ThreadPoolExecutor换成ProcessExecutor即可。

在 2020年3月4日星期三 UTC+8下午12:19:57,kingname写道:
你这个需求非常简单,为此我还写了一篇微信公众号来告诉你如何解决这个问题:

Mengyang Li

unread,
Mar 4, 2020, 12:14:22 AM3/4/20
to pyth...@googlegroups.com
没, 你大概完全没有理解我要做的事情 ,其实我后来是这么做的,…

下面的代码会在主进程/线程里bind端口,并且把socket以fd形式传递给进程池的executor,每个子进程会起一个event loop来异步处理每个链接

async def handle_echo(readerwriter):
    pass
async def start_server(fileno):
    sock = socket.fromfd(fileno, socket.AF_INET, socket.SOCK_STREAM)
    server = await asyncio.start_server(handle_echo, sock=sock)
    addr = server.sockets[0].getsockname()
    logger.info(f'Serving on {addr}')
    async with server:
        await server.serve_forever()

def process_main(sock):
    loop = asyncio.new_event_loop()
    loop.run_until_complete(start_server(sock))
async def amain_mp():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('0.0.0.0', PORT))
    sock.listen(32)
pool = concurrent.futures.ProcessPoolExecutor(cpu)
    for i in range(cpu):
        task = loop.run_in_executor(pool, process_main, sock.fileno())
        tasks.append(task)
    await asyncio.gather(*tasks)

--
邮件来自: `CPyUG`华蟒用户组(中文Python技术邮件列表)
规则: http://code.google.com/p/cpyug/wiki/PythonCn
详情: http://code.google.com/p/cpyug/wiki/CpyUg
严正: 理解列表! 智慧提问! http://wiki.woodpecker.org.cn/moin/AskForHelp
---
您收到此邮件是因为您订阅了Google网上论坛上的“python-cn(华蟒用户组,CPyUG 邮件列表)”群组。
要退订此群组并停止接收此群组的电子邮件,请发送电子邮件到python-cn+...@googlegroups.com

panfei

unread,
Mar 17, 2020, 5:19:24 AM3/17/20
to 华蟒用户组
实现一个 STUN 协议服务器 ?

Mengyang Li <mayl...@gmail.com> 于2020年3月4日周三 下午1:14写道:


--
不学习,不知道

Mengyang Li

unread,
Mar 17, 2020, 4:40:10 PM3/17/20
to pyth...@googlegroups.com
一个普通的TCP服务器,多核和异步混用。类似于gunicorn+Async Workers, 但是现有的代码都是http,我这个需求比较简单,只需要tcp即可

Yunfan Jiang

unread,
Mar 20, 2020, 10:31:30 AM3/20/20
to pyth...@googlegroups.com
可以用封装得更好一点的curio 或者trio  前者的run_in_process大概能解决你这个问题

Mengyang Li <mayl...@gmail.com> 于2020年3月18日周三 上午4:40写道:


--
Name: yunfan
Site: http://geek42.info/
Interest:
  - Lang: [forth, clojure, c, python, lua]
  - software: [nginx, redis]
  - abstract: [vm, tiny, cloud, html5]
  - history
  - science-fiction
  - music: [new-age, vangelis, yanni]

Mengyang Li

unread,
Mar 21, 2020, 6:28:43 PM3/21/20
to pyth...@googlegroups.com
因为使用了uvloop,不好再用其他的实现了。

Reply all
Reply to author
Forward
0 new messages