Threadpool for gevent.

1,358 views
Skip to first unread message

Peter Saunders

unread,
Feb 17, 2010, 7:20:18 AM2/17/10
to gevent: coroutine-based Python network library
I have implemented a threadpool for use in gevent for those calls that
have to block. It hasn't got all the methods you may want from a pool,
but is a reasonable starting point. If this proves useful, I would add
functionality such as map/map_async.

Does anyone have any feedback for this?

Code for the module and example case + output below.

=====================================

import Queue
import gevent
import gevent.event
import threading

class ThreadJob(threading.Thread):
def __init__ (self, queue, initializer=None, initargs=None):
threading.Thread.__init__(self)
self.queue = queue
self.initializer = initializer
self.initargs = initargs

def run(self):
if self.initializer:
self.initializer(*self.initargs)
while True:
try:
func, args, kwargs, async_res = self.queue.get()
except TypeError:
return

try:
res = func(*args, **kwargs)
async_res.set(res)
except Exception, e:
async_res.set_exception(e)

class ThreadPool(object):
def __init__(self, poolsize=5, initializer=None, initargs=None, \
daemon_threads=True):

self.poolsize = poolsize
self.thread_list = []
self.queue = Queue.Queue()
self.initializer = initializer
self.initargs = initargs
self.daemon_threads = daemon_threads

self.spawn_threads(self.poolsize)

def spawn_threads(self, num):
for x in range(0, num):
t = ThreadJob(self.queue, self.initializer, self.initargs)
t.start()
daemon = self.daemon_threads
self.thread_list.append(t)

def run_async(self, func, *args, **kwargs):
async_res = gevent.event.AsyncResult()
self.queue.put((func, args, kwargs, async_res))
return async_res

def close(self):
for x in range(0, self.poolsize):
self.queue.put(None)

=====================================


=====================================
import gevent
from threadpool import ThreadPool
from copy import copy
import time

def printMessage(msg):
print time.strftime("%H:%M:%S"), str(msg)

def blockcall(x):
time.sleep(x)
return x*x

def exceptcall(x):
raise Exception("explode %d" %x)

pool = ThreadPool()

async_results = []
printMessage("Starting pool functions")
async_results.append(pool.run_async(blockcall, 4))
async_results.append(pool.run_async(exceptcall, 3))
async_results.append(pool.run_async(blockcall, 2))

while async_results:
for result in copy(async_results):
if result.ready():
if result.successful():
printMessage(result.value)
else:
printMessage(result.exception)

async_results.remove(result)

gevent.sleep(0.25)

pool.close()

=====================================

$ ./thpooltest.py
12:13:09 Starting pool functions
12:13:09 explode 3
12:13:11 4
12:13:13 16

=====================================

Denis Bilenko

unread,
Feb 18, 2010, 12:09:10 AM2/18/10
to gev...@googlegroups.com
Great! thanks again for sharing.

One minor note: to make it compatible with monkey.patch_all() you
should imports things the following way:

from Queue import Queue
from threading import Thread

and monkey patch *after* you've imported this module.

This is because monkey.patch_thread() (which is called by
monkey.patch_all()) makes threading use greenlets, so Thread becomes a
greenlet-based, not os-thread based. Same goes for other stuff in
threading, such as Queue.
(This could be useful, for example. in Django, which uses threadlocal
variable for storing connections to the database. After monkey
patching the threadlocal class becomes effectively greenlet-local).

I think that gevent should provide a way to import the original
modules regardless of whether monkey patching is enabled, that way the
above won't be a concern for a module like this. I'll try to figure
out what's the best way to do this.

Another issue I found is that you don't actually set daemon status of
your threads. I've attached the fixed version along with the example I
used to test it.

Cheers,
Denis.

gevent_threadpool.py

Peter Saunders

unread,
Feb 18, 2010, 11:51:01 AM2/18/10
to gevent: coroutine-based Python network library
I think I have a bit of a problem with the code...

I started writing some unittest's (and expanding functionality).. I've
noticed that, in my test cases the wait() on the AsyncResult never
returns. If you do a wait(timeout=2), and then a .ready() - this
returns True, but, it will never return in blocking mode..

I'm assuming this is because the set, being run in another thread -
doesn't do what is required to the hub? Any ideas on a workaround?

My test case is below, which I put onto the end of the
gevent_threadpool.py instead of your __main__ section.

if __name__ == '__main__':
import sys
import os
import copy
import time
import greentest

def blockSleep(x):
time.sleep(x)
return x

class ThreadPoolTestFunctions(greentest.TestCase):
__timeout__ = 10
def setUp(self):
greentest.TestCase.setUp(self)
self.pool = ThreadPool()

def test_run_async(self):
x2 = self.pool.run_async(blockSleep, 0.1)
gevent.sleep(0)
print "wait"
x2.wait()
print "Fin"
self.assert_(x2.ready())

def tearDown(self):
print "teardown"
self.pool.close()
greentest.TestCase.tearDown(self)

greentest.main()


##################

A simpler test of it failing:

if __name__ == '__main__':
import sys
import os
import copy
import time
import greentest

def blockSet(e):
time.sleep(1)
e.set(1)

class TestThreadAsyncResult(greentest.TestCase):
__timeout__ = 10
def testwait(self):
e = gevent.event.AsyncResult()
t = threading.Thread(target=blockSet, args=(e,))
t.start()
gevent.sleep(0)
e.wait()
self.assertEqual(e.value, 1)


greentest.main()

Denis Bilenko

unread,
Feb 18, 2010, 1:31:42 PM2/18/10
to gev...@googlegroups.com
Right. I completely forgot about this the first time.

On Thu, Feb 18, 2010 at 10:51 PM, Peter Saunders <pa...@fodder.org.uk> wrote:
> I think I have a bit of a problem with the code...
>
> I started writing some unittest's (and expanding functionality).. I've
> noticed that, in my test cases the wait() on the AsyncResult never
> returns. If you do a wait(timeout=2), and then a .ready() - this
> returns True, but, it will never return in blocking mode..
>
> I'm assuming this is because the set, being run in another thread -
> doesn't do what is required to the hub? Any ideas on a workaround?

Yes, the standard trick - create a pair of descriptors (by using pipe()) .
Write to one of them in the worker thread, wait_read for another one
in the main thread.

I've attached the example code. It's not tested much but it passes your test.
It has a class MTAsyncResult that is exactly like AsyncResult except it works
as you expect it to: set in worker thread wakes up the main thread.

Cheers,
Denis.

mt_asyncresult.py

Peter Saunders

unread,
Feb 19, 2010, 5:14:34 AM2/19/10
to gevent: coroutine-based Python network library
Excellent, I guess we need to be careful on some platforms (e.g.
solaris and 32 bit python) when opening a lot of pipes could quite
quickly take the fileno > 256 - which would then stop you opening
files.. (no matter what your ulimit is), due to the annoying stdio
issue.

I've modified your example slightly to spawn the reading of the pipe
into a greenlet - which then sets an internal event that get/wait
uses. This would make cleaning up the pipe easier. (I don't think
wait_read likes having the pipe closed under it, as it then never
returns?).

Thoughts?

==========================
class MTAsyncResult(gevent.event.AsyncResult):
def __init__(self, func, *args, **kwargs):
self._func = func
self._args = args
self._kwargs = kwargs
self.started = False
# dequeued is set if the job is removed from the execution
queue before ever starting.
self.dequeued = False
gevent.event.AsyncResult.__init__(self)
self._internal_event = gevent.event.Event()

self._pipe = pipe()
gevent.spawn(self._pipe_read)

def _pipe_read(self):
while (not self.dequeued) and (self._exception is _NONE):
try:
print "waitread"
wait_read(self._pipe[0], timeout=1,
timeout_exc=gevent.Timeout)
self._internal_event.set()
except gevent.Timeout, e:
continue
os.close(self._pipe[0])
os.close(self._pipe[1])

def set_exception(self, exception):
gevent.event.AsyncResult.set_exception(self, exception)
os.write(self._pipe[1], '\0')

def set(self, value=None):
gevent.event.AsyncResult.set(self, value)
os.write(self._pipe[1], '\0')

def wait(self, timeout=None):
self._internal_event.clear()
if self._exception is not _NONE:
return
self._internal_event.wait(timeout)

def get(self, block=True, timeout=None):
self._internal_event.clear()
if self._exception is not _NONE:
if self._exception is None:
return self.value
raise self._exception

self._internal_event.wait(timeout)
if self._exception is not _NONE:
if self._exception is None:
return self.value
raise self._exception

Peter Saunders

unread,
Feb 20, 2010, 2:22:09 PM2/20/10
to gevent: coroutine-based Python network library
Ok, I have attached my latest version of my threadpool, a handful of
(not best written) unittests, and 3 simple examples. (1 how to use a DB,
and 2 process related ones).

I've only run and tested this on 2.6 - but, i've tried to avoid any
known 2.6 dependancies.

As for basic documentation, i'm not sure how you would normally make a
"proper" python documentation to go up on the website, but a basic
summary:

======================

ThreadPool(poolsize, initializer, initargs, daemon_threads)
poolsize = Number of worker threads to create
initializer = Function each thread to run before doing work
initargs = arguments to the initializer
daemon_threads = If the threads should be daemon threads or not

resize(newsize)
newsize = Change the poolsize to the new size. (grow or shrink)

apply_async(func, *args, **kwargs)
Run a function, with passed arguments, and keyword arguments in
thread pool. Return a MTAsyncResult result object.

apply(func, *args, **kwargs)
Same as apply_async, except it will block, and return the result of
function. It does this by returning the value of get() from the
MTAsyncResult object.

map(func, args_list)
func = Function to execute
args_list = A list of *args

Function runs apply_async for every element in the list, and will
block until all methods have been returned. It will return with a
list of MTAsyncResult objects.

map_async(func, args_list)
Same as map() except it wil not block, and will return the still to
be processed MTAsyncResult objects.

queue_remove(async_res)
async_res = MTAsyncResult returned from apply_async or map_async.

Removes an object from the queue to be processed, only if it has not
yet been processed. The MTAsyncResult will have a Cancelled
Exception set if this happens.

queue_remove_many(async_res_list)
same as queue_remove, except it cancels a list of MTAsyncResult
objects.

close()
Shutdown all running threads, AFTER the queue has been emptied of
the current queued jobs.

==================================================

MTAsyncResult(func, *args, **kwargs):
Created by the threadpool, and each thread will take 1 of these off
the queue. The thread will then execute the function passed, with
the args and kwargs.

Normal AsyncResult methods are there (get, set, set_exception). But also
has:

cancel()
Sets the exception to a Cancelled Exeception. Its expected for only
the pool to ever issue this.

Outstanding:
resize - probably should put jobs at the beginning of the queue which means
changing the internal Queue object - or change the Queue object to a
priority queue, and put it on with a very high priority. A priority
queue would be good to use, but, it only came in on 2.6.

Maybe we should have a greenlet that runs, and polls all the threads
ever so often, and restarts any that have unexpectedly died? This could
also keep the poolsize number accurate to the number of current threads.
(It could lie with a large queue, and resize being used to shrink)

map() I'd like to add a cancel keyword argument, which in the event of
any fucntion raising an exception, it would auto-cancel any not-yet-run
functions given in the map. To do this I would need to use waitany()

waitany() I think isn't very well implemented. I'm sure there is a
better way of doing this?

Any thoughts/comments?

Cheers
Pete

database_example.py
gevent_threadpool.py
multiping_example.py
process_example.py
threadpool_unittest.py

Denis Bilenko

unread,
Feb 23, 2010, 3:10:33 PM2/23/10
to gev...@googlegroups.com
Hi Peter,

Do you have an account on bitbucket? It would be easier to work on
this together if the files were under version control.

I've created a project here
http://bitbucket.org/denis/gevent-playground/ for stuff that people
might want to share here but that is too small to create a new
project. Send me your username, I'll add you to the writers group.

Thanks a lot for working on this! (and I'll send my comments on the
code a bit later)

Cheers,
Denis.

On Sun, Feb 21, 2010 at 1:22 AM, Peter Saunders <pa...@fodder.org.uk> wrote:
> Ok, I have attached my latest version of my threadpool, a handful of
> (not best written) unittests, and 3 simple examples. (1 how to use a DB,
> and 2 process related ones).
>
> I've only run and tested this on 2.6 - but, i've tried to avoid any
> known 2.6 dependancies.
>
> As for basic documentation, i'm not sure how you would normally make a
> "proper" python documentation to go up on the website, but a basic
> summary:

> Any thoughts/comments?
>
> Cheers
> Pete
>
>

Peter Saunders

unread,
Feb 23, 2010, 4:36:13 PM2/23/10
to gev...@googlegroups.com
On Wed, Feb 24, 2010 at 02:10:33AM +0600, Denis Bilenko wrote:
> Hi Peter,
>
> Do you have an account on bitbucket? It would be easier to work on
> this together if the files were under version control.

I didn't, but I do now :)



> I've created a project here
> http://bitbucket.org/denis/gevent-playground/ for stuff that people
> might want to share here but that is too small to create a new
> project. Send me your username, I'll add you to the writers group.

pajs

> Thanks a lot for working on this! (and I'll send my comments on the
> code a bit later)

No problem - i'll look forward to hearing your comments soon :)

Cheers
Pete

Denis Bilenko

unread,
Mar 2, 2010, 6:05:19 AM3/2/10
to gev...@googlegroups.com
Hi Peter,

It looks cool, the examples work for me and the test suite passes.

I've put your code here: http://bitbucket.org/denis/gevent-playground/src/
(I've added write permissions to your account, so feel free to update
it in place)

Some comments on the code:

- a module from gevent package should not do monkey patching; it's up
to the application
- at the same time it should work in monkey patched environment.
This particular case seems to have a problem with patch_all(), as
that we'll patch
thread module and threading will become a greenlet-based.
Not sure yet what's the best remedy.
- the documentation should be in doctests.
for example markup check out:
http://bitbucket.org/denis/gevent/src/tip/gevent/event.py
sphinx will generate this page out of it:
http://www.gevent.org/gevent.event.html


On Fri, Feb 19, 2010 at 4:14 PM, Peter Saunders <pa...@fodder.org.uk> wrote:
> Excellent, I guess we need to be careful on some platforms (e.g.
> solaris and 32 bit python) when opening a lot of pipes could quite
> quickly take the fileno > 256 - which would then stop you opening
> files.. (no matter what your ulimit is), due to the annoying stdio
> issue.

This is a very good point, actually. Twisted has a single pipe per event loop
that lets them provide reactor.callFromThread() function. We should
have something similar in gevent.

Also, maybe we can even change gevent.event.AsyncResult to support
such usage without having
a separate MTAsyncResult class. Would be quite cool of all of gevent
was threadsafe but not sure
if it's possible without too much overhead in a single-thread case.


On Sun, Feb 21, 2010 at 1:22 AM, Peter Saunders <pa...@fodder.org.uk> wrote:

Peter Saunders

unread,
Mar 3, 2010, 10:53:07 AM3/3/10
to gev...@googlegroups.com
On Tue, Mar 02, 2010 at 05:05:19PM +0600, Denis Bilenko wrote:
> Hi Peter,
>
> It looks cool, the examples work for me and the test suite passes.
>
> I've put your code here: http://bitbucket.org/denis/gevent-playground/src/
> (I've added write permissions to your account, so feel free to update
> it in place)
>
> Some comments on the code:
>
> - a module from gevent package should not do monkey patching; it's up
> to the application
> - at the same time it should work in monkey patched environment.
> This particular case seems to have a problem with patch_all(), as
> that we'll patch
> thread module and threading will become a greenlet-based.
> Not sure yet what's the best remedy.
> - the documentation should be in doctests.
> for example markup check out:
> http://bitbucket.org/denis/gevent/src/tip/gevent/event.py
> sphinx will generate this page out of it:
> http://www.gevent.org/gevent.event.html

Ok - I will update this code, and sort out the documentation of it.



> This is a very good point, actually. Twisted has a single pipe per event loop
> that lets them provide reactor.callFromThread() function. We should
> have something similar in gevent.
>
> Also, maybe we can even change gevent.event.AsyncResult to support
> such usage without having
> a separate MTAsyncResult class. Would be quite cool of all of gevent
> was threadsafe but not sure
> if it's possible without too much overhead in a single-thread case.

Agreed, the bit of code I am using this for, i've had to be very careful
about how big I let the queue get. I regularly found myself running out
of FD's :(

How would such code work? Send a reference to it down the pipe, and a
value, and then set it in a dedicated greenlet?

Cheers
Pete

Denis Bilenko

unread,
Mar 3, 2010, 1:46:01 PM3/3/10
to gev...@googlegroups.com

I think just using core.timer/active_event/read_event and then writing to pipe
to wake up the main thread would work.

core.timer and friends use global variable, but they are protected by GIL.

>
> Cheers
> Pete
>

Peter Saunders

unread,
Mar 3, 2010, 2:34:10 PM3/3/10
to gev...@googlegroups.com
On Thu, Mar 04, 2010 at 12:46:01AM +0600, Denis Bilenko wrote:
> > How would such code work? Send a reference to it down the pipe, and a
> > value, and then set it in a dedicated greenlet?
>
> I think just using core.timer/active_event/read_event and then writing to pipe
> to wake up the main thread would work.
>
> core.timer and friends use global variable, but they are protected by GIL.

I think I'm not understanding you correctly.

So, say you have 3 MTAsyncResults - all being set by say 3 different
threads. So, there is a pipe which is read from the main event loop,
which is declared globally? (Or maybe scoped to the threadpool, and
passed to all running threads created)

A thread sets the result of the MTAsyncResult and then writes down the
pipe, which is then read by the event loop, but, how does the event loop
know what MTAsyncResult has been changed and then set that
MTAsyncResult's internal event? Which is why I was asking about writing
something down the pipe that allows the event loop to know which is
being set?

We could do something similar with a Queue, but, then it may require
regular polling instead - which isn't ideal.

Cheers
Pete

Denis Bilenko

unread,
Mar 4, 2010, 12:50:47 AM3/4/10
to gev...@googlegroups.com
On Thu, Mar 4, 2010 at 1:34 AM, Peter Saunders <pa...@fodder.org.uk> wrote:
> On Thu, Mar 04, 2010 at 12:46:01AM +0600, Denis Bilenko wrote:
>> > How would such code work? Send a reference to it down the pipe, and a
>> > value, and then set it in a dedicated greenlet?
>>
>> I think just using core.timer/active_event/read_event and then writing to pipe
>> to wake up the main thread would work.
>>
>> core.timer and friends use global variable, but they are protected by GIL.
>
> I think I'm not understanding you correctly.
>
> So, say you have 3 MTAsyncResults - all being set by say 3 different
> threads. So, there is a pipe which is read from the main event loop,
> which is declared globally? (Or maybe scoped to the threadpool, and
> passed to all running threads created)

what I have in mind is one pipe for the whole event loop.

>
> A thread sets the result of the MTAsyncResult and then writes down the
> pipe, which is then read by the event loop, but, how does the event loop
> know what MTAsyncResult has been changed and then set that
> MTAsyncResult's internal event?

It does not have to know. When you set() an Event, you've registered a
new callback that will wake up that event's waiters.
But since you did it from another thread, this callback won't be not
executed immediately, because event_dispatch() already calculated how
much it should sleep and called select/epoll() with that timeout.

The point of the pipe is just to make select() return so that the
event that was added is taken into consideration.

Cheers,
Denis.

Reply all
Reply to author
Forward
0 new messages