使用线程池解决数据库阻塞问题

24 views
Skip to first unread message

沈崴

unread,
Dec 8, 2008, 1:24:28 AM12/8/08
to eurasia-users
不仅是 eurasia, 所有使用 stackless 和 greenlet 轻便线程模型的程序都会遇到 IO 阻塞的问题。因为使用轻便线程的
程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会
造成阻塞, 这都是需要解决的。对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗
粒度的更具通用性的基于线程池的解决方案。

from Queue import Queue
from thread import start_new_thread
from stackless import channel, getcurrent

class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
start_new_thread(self.pipe, ())

def __call__(self, func):
def wrapper(*args, **kw):
rst = channel()
self.queue.put((getcurrent(), rst, func, args, kw))

return rst.receive()

return wrapper

def pipe(self):
while True:
curr, rst, func, args, kw = self.queue.get()
try:
result = func(*args, **kw)

except Exception, e:
curr.raise_exception(e)
else:
rst.send(result)

下面假定对某 SQL 客户端的 Cursor 类, 进行线程池包装, 使数据库的 commit 操作不会阻塞掉整个程序。

nonblock = ThreadPool()

class Cursor(sqlite3.Cursor):
@nonblock
def commit(self):
return sqlite3.Cursor.commit(self)

使用 processing 库及其 Queue 实现, 我们可以做出进程池, 代码雷同。

Zoom.Quiet

unread,
Dec 8, 2008, 1:40:56 AM12/8/08
to eurasi...@googlegroups.com
2008/12/8 沈崴 <wile...@gmail.com>:

> 不仅是 eurasia, 所有使用 stackless 和 greenlet 轻便线程模型的程序都会遇到 IO 阻塞的问题。因为使用轻便线程的
> 程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会
> 造成阻塞, 这都是需要解决的。对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗
> 粒度的更具通用性的基于线程池的解决方案。
>
收录!
http://wiki.woodpecker.org.cn/moin/MiscItems/2008-12-08

不过,这是基于经验,以及Python 内置的模块,前没有坚实的数学基础,
是否可以跟上业务的激变,需要小白鼠,高品质的,,,

--
http://zoomquiet.org'''
过程改进乃是催生可促生靠谱的人的组织!
PE keeps evolving organizations which promoting people be good!'''
[HR]金山软件常年招聘大量Py/C++人才!
https://groups.google.com/group/python-cn/web/ot-py-c
简历直投俺就好;-)

张沈鹏

unread,
Dec 8, 2008, 1:49:31 AM12/8/08
to eurasi...@googlegroups.com
好东西,不过这样一来,
@nonblock的东西就要求是线程安全的了

沈崴

unread,
Dec 8, 2008, 2:00:55 AM12/8/08
to eurasia-users
On 12月8日, 下午2时40分, Zoom.Quiet <zoom.qu...@gmail.com> wrote:
> 2008/12/8 沈崴 <wilei...@gmail.com>:> 不仅是 eurasia, 所有使用 stackless 和 greenlet 轻便线程模型的程序都会遇到 IO 阻塞的问题。因为使用轻便线程的

> > 程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会

先勘下误, 这里 "甚至 pool" 应该是 poll 。

> > 造成阻塞, 这都是需要解决的。对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗
> > 粒度的更具通用性的基于线程池的解决方案。
>
> 收录!http://wiki.woodpecker.org.cn/moin/MiscItems/2008-12-08
>
> 不过,这是基于经验,以及Python 内置的模块,前没有坚实的数学基础,
> 是否可以跟上业务的激变,需要小白鼠,高品质的,,,

呵呵, 数据库 IO 对 Stackless 来说是一个很头大的问题。张沈鹏前面 (http://groups.google.com/
group/eurasia-users/browse_thread/thread/f9659dc3d576aeeb) 提到 Eventlet
(http://wiki.secondlife.com/wiki/Eventlet/Documentation #Database
access) 和我的线程池思路是一致的 (具体代码稍有差异)。

我肯定是头号小白鼠了, 欢迎大家一起加入这个光荣的行列 :D

> --http://zoomquiet.org'''

Zoom.Quiet

unread,
Dec 8, 2008, 2:05:22 AM12/8/08
to eurasi...@googlegroups.com
2008/12/8 沈崴 <wile...@gmail.com>:

> On 12月8日, 下午2时40分, Zoom.Quiet <zoom.qu...@gmail.com> wrote:
>> 2008/12/8 沈崴 <wilei...@gmail.com>:> 不仅是 eurasia, 所有使用 stackless 和 greenlet 轻便线程模型的程序都会遇到 IO 阻塞的问题。因为使用轻便线程的
>> > 程序实质上都是单线程程序, 任何一个阻塞都会让所有的轻便线程停掉。不仅是数据库, 文件 IO、网络 IO、sleep (甚至 pool) 都会
>
> 先勘下误, 这里 "甚至 pool" 应该是 poll 。
>
>> > 造成阻塞, 这都是需要解决的。对于支持 async IO 的文件和网络 IO, eurasia 采用了一种细粒度的解决方案, 这里我提供一种粗
>> > 粒度的更具通用性的基于线程池的解决方案。
>>
>> 收录!http://wiki.woodpecker.org.cn/moin/MiscItems/2008-12-08
>>
>> 不过,这是基于经验,以及Python 内置的模块,前没有坚实的数学基础,
>> 是否可以跟上业务的激变,需要小白鼠,高品质的,,,
>
> 呵呵, 数据库 IO 对 Stackless 来说是一个很头大的问题。张沈鹏前面 (http://groups.google.com/
> group/eurasia-users/browse_thread/thread/f9659dc3d576aeeb) 提到 Eventlet
> (http://wiki.secondlife.com/wiki/Eventlet/Documentation #Database
> access) 和我的线程池思路是一致的 (具体代码稍有差异)。
>
> 我肯定是头号小白鼠了, 欢迎大家一起加入这个光荣的行列 :D
>

俺也将是,但是,无法是高品质的,实在没有什么高负荷的服务来尝试,,,

沈崴

unread,
Dec 8, 2008, 2:05:43 AM12/8/08
to eurasia-users
On 12月8日, 下午2时49分, "张沈鹏" <zsp...@gmail.com> wrote:
> 好东西,不过这样一来,
> @nonblock的东西就要求是线程安全的了

呵呵, 没错, 确实要求是线程安全了。不过幸好 Python 和 Python 库在大部分情况下都是线程安全的。
对于非线程安全的, 做一个单线程的线程池吧:

safe_nonblock = ThreadPool(1)

我承认这招是损了点 ...

张沈鹏

unread,
Dec 8, 2008, 2:08:04 AM12/8/08
to eurasi...@googlegroups.com
不过set居然不是线程安全的,要用freezeset...囧啊囧啊囧

张沈鹏

unread,
Dec 8, 2008, 2:11:00 AM12/8/08
to eurasi...@googlegroups.com
我的想法是,
不少东西,
可以引入远程调用/Message Queue 来做
而远程调用可以做成是no block的
:)

沈崴

unread,
Dec 8, 2008, 2:39:05 AM12/8/08
to eurasia-users

上面那个 ThreadPool 做点修改可以变成 ProcessPool:

from processing import Process, Queue

class ProcessPool:


def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
p = Process(target=self.pipe, args=
(self.queue, ))
p.start()

...

这个就是传说中的进程池, 用到了 pyprocessing 库 (multiprocessing 在 python2.6 中已经是标准库
了), 我们知道 pyprocessing 库不仅可以创建多进程, 而且支持和远程计算机进行进程间通信。

换句话说, 这就是老张想要实现的东西。processpool(func)(*args, **kw), 使用进程池调用 func, 可以实现网络
运算。
一个 func 函数可以在任意一个 cpu 或核上执行, 甚至可以是在一台远程计算机上。

沈崴

unread,
Dec 23, 2008, 8:49:06 PM12/23/08
to eurasia-users
一个 BUG。raise_exception() 接口接收的是 ExceptionClass、args 两个参数, 因此这种写法是不合法的:
curr.raise_exception(Exception(*args))

而应该写成:
curr.raise_exception(Exception, *args)

这是一件十分糟糕的事情。为此, 线程池代码需要修改成这样:

from Queue import Queue
from thread import start_new_thread
from exceptions import BaseException
from stackless import channel, getcurrent

class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
start_new_thread(self.pipe, ())

def __call__(self, func):
def wrapper(*args, **kw):
rst = channel()
self.queue.put((getcurrent(), rst, func, args,
kw))

return rst.receive()

return wrapper

def pipe(self):
while True:
curr, rst, func, args, kw = self.queue.get()
try:
result = func(*args, **kw)

except BaseException, e:
# curr.raise_exception(e)
curr.raise_exception(type(e), *e.args)
else:
rst.send(result)

这里推荐一种新写法, 执行效率上区别不大, 但是调试要更友好一些。

class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
start_new_thread(self.pipe, ())

def __call__(self, func):
def wrapper(*args, **kw):
e = channel()
self.queue.put((e, func, args, kw))

errno, e = e.receive()
if errno == 0:
return e

raise e

return wrapper

def pipe(self):
while True:
rst, func, args, kw = self.queue.get()
try:
result = func(*args, **kw)

except Exception, e:
rst.send((-1, e))
else:
rst.send((0, result))

张沈鹏

unread,
Dec 25, 2008, 9:47:29 PM12/25/08
to eurasi...@googlegroups.com
奇怪 线程池运行起来好像有点问题
实现如下
常常block在
waiting for msg
而不到
run func for result
这里去
很奇怪

class ThreadPool(object):


def __init__(self, n=32):
self.queue = Queue()
for i in xrange(n):
start_new_thread(self.pipe, ())

def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwds):
rst = channel()
self.queue.put((getcurrent(), rst, func, args, kwds))
print "waiting for msg"
return rst.receive()
return wrapper

def pipe(self):
while True:
curr, rst, func, args, kwds = self.queue.get()
try:
print "\nrun func for result"
result = func(*args, **kwds)
print "\nreply"
except Exception, e:
curr.raise_exception(type(e), *e.args)
else:
rst.send(result)

沈崴

unread,
Dec 25, 2008, 10:35:40 PM12/25/08
to eurasia-users
On 12月26日, 上午10时47分, "张沈鹏" <zsp...@gmail.com> wrote:
> 奇怪 线程池运行起来好像有点问题
> 实现如下
> 常常block在
> waiting for msg
> 而不到
> run func for result
> 这里去
> 很奇怪
>
> class ThreadPool(object):
> def __init__(self, n=32):
> self.queue = Queue()
> for i in xrange(n):
> start_new_thread(self.pipe, ())
>
> def __call__(self, func):
> @wraps(func)

这个 @wraps(func) 是什么? 缺少上下文。

> def wrapper(*args, **kwds):
> rst = channel()
> self.queue.put((getcurrent(), rst, func, args, kwds))
> print "waiting for msg"
> return rst.receive()
> return wrapper
>
> def pipe(self):
> while True:
> curr, rst, func, args, kwds = self.queue.get()
> try:
> print "\nrun func for result"
> result = func(*args, **kwds)
> print "\nreply"
> except Exception, e:
> curr.raise_exception(type(e), *e.args)
> else:
> rst.send(result)

出现这种情况有可能是因为 func 调用诡异地挂掉了。
我这里给个调试的例子, 使用 traceback 进行输出。

import sys
from Queue import Queue
from traceback import print_exc
from thread import start_new_thread
from exceptions import BaseException
from stackless import channel, getcurrent

DEBUG = 1

class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
start_new_thread(self.pipe, ())

def __call__(self, func):
def wrapper(*args, **kw):
e = channel()
self.queue.put((e, func, args, kw))

errno, e = e.receive()
if errno == 0:
return e

raise e

return wrapper

def pipe(self):
while True:
rst, func, args, kw = self.queue.get()
try:
result = func(*args, **kw)

except BaseException, e:
if DEBUG:
print_exc(file=sys.stderr)

沈崴

unread,
Dec 25, 2008, 10:45:10 PM12/25/08
to eurasia-users
另外 curr.raise_exception(type(e), *e.args) 这个调用存在潜在问题, 就是如果程序抛出了自定义的
Exception, 那么我们无法从 e.args 中得到 Exception 的初始化参数, 这里是一个例子。

class MyException(Exception):
def __init__(self, a, b):
Exception.__init__(self, a) # 只用 a 参数初始化了 Exception
self.b = b # 这个 b 就无法从 e.args 中得到了, 只能从 e.b 得到

我不知道 raise_exception (或者 greenlet 的 "throw()" ) 为什么会被设计成这样
(raise_exception(Exception, *args) ), 这对用户来说好像很不方便。

解决方法是使用我上面推荐的另一种新的写法, 用 rst.send((-1, e)) 表示异常, 用 rst.send((0,
result)) 返回正常结果。尽管要多做一次判断, 幸好对线程池应用来说并没有性能损失。

张沈鹏

unread,
Dec 25, 2008, 10:47:21 PM12/25/08
to eurasi...@googlegroups.com
不过好像还是被阻塞
很奇怪
我星期天慢慢来研究
诡异的问题

Zoom.Quiet

unread,
Dec 25, 2008, 10:54:17 PM12/25/08
to eurasi...@googlegroups.com
2008/12/26 张沈鹏 <zsp...@gmail.com>:
> 不过好像还是被阻塞
> 很奇怪
> 我星期天慢慢来研究
> 诡异的问题
>

没有通过测试就释放哪,,
建议请 沈写个测试包,通过的就收,,

--
http://zoomquiet.org
'''过程改进乃是催生可促生靠谱的人的组织!'''
一个人如果力求完善自己,就会看到:为此也必须同时完善他人. 一个人如果不关心别人的完善,自己便不可能完善!

张沈鹏

unread,
Dec 25, 2008, 11:44:36 PM12/25/08
to eurasi...@googlegroups.com
import noblock


noblock=noblock.ThreadPool(32)

@noblock
def a():
print "a"

for i in range(32):a()

果然会被block
上面是测试代码

沈崴

unread,
Dec 26, 2008, 1:57:44 AM12/26/08
to eurasia-users
老张呵, 这个是 Stackless Python 的 ThreadPool :)

import sys, stackless
from Queue import Queue
from traceback import print_exc
from thread import start_new_thread
from exceptions import BaseException
from stackless import channel, getcurrent, schedule, tasklet

class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
start_new_thread(self.pipe, ())

def __call__(self, func):
def wrapper(*args, **kw):
e = channel()
self.queue.put((e, func, args, kw))

errno, e = e.receive()
if errno == 0:
return e

raise e

return wrapper

def pipe(self):
while True:
rst, func, args, kw = self.queue.get()
try:
result = func(*args, **kw)

except BaseException, e:
rst.send((-1, e))
else:
rst.send((0, result))

nonblock = ThreadPool(32)

@nonblock
def test(n):
sys.stdout.write('%d ' %n)
sys.stdout.flush()

for i in xrange(1000):
test(i)

stackless.run()

因为 Eurasia3 里已经有主循环了, 所以不需要 stackless.run()。
但是如果不使用 Eurasia3 , 而直接在 Stackless Python 中使用 ThreadPool 的话,
就需要考虑这个问题了。

沈崴

unread,
Dec 26, 2008, 2:11:26 AM12/26/08
to eurasia-users
On 12月26日, 下午2时57分, 沈崴 <wilei...@gmail.com> wrote:
> On 12月26日, 下午12时44分, "张沈鹏" <zsp...@gmail.com> wrote:
> > ...
> > 果然会被block
> > 上面是测试代码
>
> 老张呵, 这个是 Stackless Python 的 ThreadPool :)
> ...
> 因为 Eurasia3 里已经有主循环了, 所以不需要 stackless.run()。
> 但是如果不使用 Eurasia3 , 而直接在 Stackless Python 中使用 ThreadPool 的话,
> 就需要考虑这个问题了。

下面这个例子把处理事务的线程也打印出来, 可以看出所有线程都在工作。

import sys, stackless
from Queue import Queue
from traceback import print_exc
from thread import start_new_thread
from exceptions import BaseException
from stackless import channel, getcurrent, schedule, tasklet

class ThreadPool:
def __init__(self, n=32):
self.queue = Queue()

for i in xrange(n):
start_new_thread(self.pipe, (i,))

def __call__(self, func):
def wrapper(*args, **kw):
e = channel()
self.queue.put((e, func, args, kw))

errno, e = e.receive()
if errno == 0:
return e

raise e

return wrapper

def pipe(self, i):
while True:
rst, func, args, kw = self.queue.get()
try:
result = func(i, *args, **kw)

except BaseException, e:
rst.send((-1, e))
else:
rst.send((0, result))

nonblock = ThreadPool(32)

@nonblock
def test(i, n):
print 'thread%d: %d' %(i, n)

张沈鹏

unread,
Dec 26, 2008, 2:12:55 AM12/26/08
to eurasi...@googlegroups.com
这个可以跑 谢谢 我回家慢慢来看...

Zoom.Quiet

unread,
Dec 26, 2008, 2:23:01 AM12/26/08
to eurasi...@googlegroups.com
2008/12/26 张沈鹏 <zsp...@gmail.com>:
> 这个可以跑 谢谢 我回家慢慢来看...

收录!
http://wiki.woodpecker.org.cn/moin/MiscItems/2008-12-26

张沈鹏

unread,
Dec 26, 2008, 2:31:33 AM12/26/08
to eurasi...@googlegroups.com
>这个 @wraps(func) 是什么? 缺少上下文。
是这个
from functools import wraps
Reply all
Reply to author
Forward
0 new messages