from Queue import Queue
from thread import start_new_thread
from stackless import channel, getcurrent
class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()
for i in xrange(n):
start_new_thread(self.pipe, ())
def __call__(self, func):
def wrapper(*args, **kw):
rst = channel()
self.queue.put((getcurrent(), rst, func, args, kw))
return rst.receive()
return wrapper
def pipe(self):
while True:
curr, rst, func, args, kw = self.queue.get()
try:
result = func(*args, **kw)
except Exception, e:
curr.raise_exception(e)
else:
rst.send(result)
下面假定对某 SQL 客户端的 Cursor 类, 进行线程池包装, 使数据库的 commit 操作不会阻塞掉整个程序。
nonblock = ThreadPool()
class Cursor(sqlite3.Cursor):
@nonblock
def commit(self):
return sqlite3.Cursor.commit(self)
使用 processing 库及其 Queue 实现, 我们可以做出进程池, 代码雷同。
不过,这是基于经验,以及Python 内置的模块,前没有坚实的数学基础,
是否可以跟上业务的激变,需要小白鼠,高品质的,,,
--
http://zoomquiet.org'''
过程改进乃是催生可促生靠谱的人的组织!
PE keeps evolving organizations which promoting people be good!'''
[HR]金山软件常年招聘大量Py/C++人才!
https://groups.google.com/group/python-cn/web/ot-py-c
简历直投俺就好;-)
先勘下误, 这里 "甚至 pool" 应该是 poll 。
> > 造成阻塞, 这都是需要解决的。对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗
> > 粒度的更具通用性的基于线程池的解决方案。
>
> 收录!http://wiki.woodpecker.org.cn/moin/MiscItems/2008-12-08
>
> 不过,这是基于经验,以及Python 内置的模块,前没有坚实的数学基础,
> 是否可以跟上业务的激变,需要小白鼠,高品质的,,,
呵呵, 数据库 IO 对 Stackless 来说是一个很头大的问题。张沈鹏前面 (http://groups.google.com/
group/eurasia-users/browse_thread/thread/f9659dc3d576aeeb) 提到 Eventlet
(http://wiki.secondlife.com/wiki/Eventlet/Documentation #Database
access) 和我的线程池思路是一致的 (具体代码稍有差异)。
我肯定是头号小白鼠了, 欢迎大家一起加入这个光荣的行列 :D
> --http://zoomquiet.org'''
俺也将是,但是,无法是高品质的,实在没有什么高负荷的服务来尝试,,,
呵呵, 没错, 确实要求是线程安全了。不过幸好 Python 和 Python 库在大部分情况下都是线程安全的。
对于非线程安全的, 做一个单线程的线程池吧:
safe_nonblock = ThreadPool(1)
我承认这招是损了点 ...
上面那个 ThreadPool 做点修改可以变成 ProcessPool:
from processing import Process, Queue
class ProcessPool:
def __init__(self, n=32):
self.queue = Queue()
for i in xrange(n):
p = Process(target=self.pipe, args=
(self.queue, ))
p.start()
...
这个就是传说中的进程池, 用到了 pyprocessing 库 (multiprocessing 在 python2.6 中已经是标准库
了), 我们知道 pyprocessing 库不仅可以创建多进程, 而且支持和远程计算机进行进程间通信。
换句话说, 这就是老张想要实现的东西。processpool(func)(*args, **kw), 使用进程池调用 func, 可以实现网络
运算。
一个 func 函数可以在任意一个 cpu 或核上执行, 甚至可以是在一台远程计算机上。
class ThreadPool(object):
def __init__(self, n=32):
self.queue = Queue()
for i in xrange(n):
start_new_thread(self.pipe, ())
def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwds):
rst = channel()
self.queue.put((getcurrent(), rst, func, args, kwds))
print "waiting for msg"
return rst.receive()
return wrapper
def pipe(self):
while True:
curr, rst, func, args, kwds = self.queue.get()
try:
print "\nrun func for result"
result = func(*args, **kwds)
print "\nreply"
except Exception, e:
curr.raise_exception(type(e), *e.args)
else:
rst.send(result)
没有通过测试就释放哪,,
建议请 沈写个测试包,通过的就收,,
--
http://zoomquiet.org
'''过程改进乃是催生可促生靠谱的人的组织!'''
一个人如果力求完善自己,就会看到:为此也必须同时完善他人. 一个人如果不关心别人的完善,自己便不可能完善!
noblock=noblock.ThreadPool(32)
@noblock
def a():
print "a"
for i in range(32):a()
果然会被block
上面是测试代码
收录!
http://wiki.woodpecker.org.cn/moin/MiscItems/2008-12-26