Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

generators shared among threads

4 views
Skip to first unread message

jess....@gmail.com

unread,
Mar 4, 2006, 1:45:12 PM3/4/06
to
hi,

This seems like a difficult question to answer through testing, so I'm
hoping that someone will just know... Suppose I have the following
generator, g:

def f()
i = 0
while True:
yield i
i += 1
g=f()

If I pass g around to various threads and I want them to always be
yielded a unique value, will I have a race condition? That is, is it
possible that the cpython interpreter would interrupt one thread after
the increment and before the yield, and then resume another thread to
yield the first thread's value, or increment the stored i, or both,
before resuming the first thread? If so, would I get different
behavior if I just set g like:

g=itertools.count()

If both of these idioms will give me a race condition, how might I go
about preventing such? I thought about using threading.Lock, but I'm
sure that I don't want to put a lock around the yield statement.

thanks,
Jess

Alex Martelli

unread,
Mar 4, 2006, 2:10:50 PM3/4/06
to
<jess....@gmail.com> wrote:

> hi,
>
> This seems like a difficult question to answer through testing, so I'm
> hoping that someone will just know... Suppose I have the following
> generator, g:
>
> def f()
> i = 0
> while True:
> yield i
> i += 1
> g=f()
>
> If I pass g around to various threads and I want them to always be
> yielded a unique value, will I have a race condition? That is, is it

Yes, you will.

> before resuming the first thread? If so, would I get different
> behavior if I just set g like:
>
> g=itertools.count()

I believe that in the current implementation you'd get "lucky", but
there is no guarantee that such luck would persist across even a minor
bugfix in the implementation. Don't do it.



> If both of these idioms will give me a race condition, how might I go
> about preventing such? I thought about using threading.Lock, but I'm
> sure that I don't want to put a lock around the yield statement.

Queue.Queue is often the best way to organize cooperation among threads.
Make a Queue.Queue with a reasonably small maximum size, a single
dedicated thread that puts successive items of itertools.count onto it
(implicitly blocking and waiting when the queue gets full), and any
other thread can call get on the queue and obtain a unique item
(implicitly waiting a little bit if the queue ever gets empty, until the
dedicated thread waits and fills the queue again). [[Alternatively you
could subclass Queue and override the hook-method _get, which always
gets called in a properly locked and thus serialized condition; but that
may be considered a reasonably advanced task, since such subclassing
isn't documented in the reference library, only in Queue's sources]].


Alex

Alan Kennedy

unread,
Mar 5, 2006, 9:14:47 AM3/5/06
to
[jess....@gmail.com]

> def f()
> i = 0
> while True:
> yield i
> i += 1
> g=f()
>
> If I pass g around to various threads and I want them to always be
> yielded a unique value, will I have a race condition?

Yes.

Generators can be shared between threads, but they cannot be resumed
from two threads at the same time.

You should wrap it in a lock to ensure that only one thread at a time
can resume the generator.

Read this thread from a couple of years back about the same topic.

Suggested generator to add to threading module.
http://groups.google.com/group/comp.lang.python/browse_frm/thread/76aa2afa913fe4df/a2ede21f7dd78f34#a2ede21f7dd78f34

Also contained in that thread is an implementation of Queue.Queue which
supplies values from a generator, and which does not require a separate
thread to generate values.

HTH,

--
alan kennedy
------------------------------------------------------
email alan: http://xhaus.com/contact/alan

jess....@gmail.com

unread,
Mar 7, 2006, 2:30:22 AM3/7/06
to
Thanks for the great advice, Alex. Here is a subclass that seems to
work:

from Queue import Queue
from itertools import count

class reentrantQueue(Queue):
def _init(self, maxsize):
self.maxsize = 0
self.queue = [] # so we don't have to override put()
self.counter = count()
def _empty(self):
return False
def _get(self):
return self.counter.next()
def next(self):
return self.get()
def __iter__(self):
return self

Alex Martelli

unread,
Mar 7, 2006, 10:21:41 AM3/7/06
to
<jess....@gmail.com> wrote:

> Thanks for the great advice, Alex. Here is a subclass that seems to
> work:

You're welcome!

> from Queue import Queue
> from itertools import count
>
> class reentrantQueue(Queue):
> def _init(self, maxsize):
> self.maxsize = 0
> self.queue = [] # so we don't have to override put()
> self.counter = count()
> def _empty(self):
> return False
> def _get(self):
> return self.counter.next()
> def next(self):
> return self.get()
> def __iter__(self):
> return self

You may also want to override _put to raise an exception, just to avoid
accidental misuse, though I agree it's marginal. Also, I'd use maxsize
(if provided and >0) as the upperbound for the counting; not sure that's
necessary but it seems pretty natural to raise StopIteration (rather
than returning the counter's value) if the counter reaches that
"maxsize". Last, I'm not sure I'd think of this as a reentrantQueue, so
much as a ReentrantCounter;-).


Alex

jess....@gmail.com

unread,
Mar 7, 2006, 1:35:16 PM3/7/06
to
Alex wrote:
> Last, I'm not sure I'd think of this as a reentrantQueue, so
> much as a ReentrantCounter;-).

Of course! It must have been late when I named this class... I think
I'll go change the name in my code right now.

Paul Rubin

unread,
Mar 7, 2006, 3:02:54 PM3/7/06
to
ale...@yahoo.com (Alex Martelli) writes:
> > g=itertools.count()
>
> I believe that in the current implementation you'd get "lucky", but
> there is no guarantee that such luck would persist across even a minor
> bugfix in the implementation. Don't do it.

I remember being told that xrange(sys.maxint) was thread-safe, but of
course I wouldn't want to depend on that across Python versions either.

> Queue.Queue is often the best way to organize cooperation among threads.
> Make a Queue.Queue with a reasonably small maximum size, a single
> dedicated thread that puts successive items of itertools.count onto it
> (implicitly blocking and waiting when the queue gets full),

This should be pretty simple to implement and not much can go wrong
with it, but it means a separate thread for each such generator, and
waiting for thread switches every time the queue goes empty. A more
traditional approach would be to use a lock in the generator,

def f():
lock = threading.Lock()


i = 0
while True:

lock.acquire()


yield i
i += 1

lock.release()

but it's easy to make mistakes when implementing things like that
(I'm not even totally confident that the above is correct).

Hmm (untested, like above):

class Synchronized:
def __init__(self, generator):
self.gen = generator
self.lock = threading.Lock()
def next(self):
self.lock.acquire()
try:
yield self.gen.next()
finally:
self.lock.release()

synchronized_counter = Synchronized(itertools.count())

That isn't a general solution but can be convenient (if I didn't mess
it up). Maybe there's a more general recipe somewhere.

jess....@gmail.com

unread,
Mar 7, 2006, 7:55:01 PM3/7/06
to
Paul wrote:
> def f():
> lock = threading.Lock()
> i = 0
> while True:
> lock.acquire()
> yield i
> i += 1
> lock.release()
>
> but it's easy to make mistakes when implementing things like that
> (I'm not even totally confident that the above is correct).

The main problem with this is that the yield leaves the lock locked.
If any other thread wants to read the generator it will block. Your
class Synchronized fixes this with the "finally" hack (please note that
from me this is NOT a pejorative). I wonder... is that future-proof?
It seems that something related to this might change with 2.5? My
notes from GvR's keynote don't seem to include this. Someone that
knows more than I do about the intersection between "yield" and
"finally" would have to speak to that.

Paul Rubin

unread,
Mar 7, 2006, 9:17:31 PM3/7/06
to
jess....@gmail.com writes:
> The main problem with this is that the yield leaves the lock locked.
> If any other thread wants to read the generator it will block.

Ouch, good catch. Do you see a good fix other than try/finally?
Is there a classical way to do it with coroutines and semaphores?

Alex Martelli

unread,
Mar 7, 2006, 10:54:33 PM3/7/06
to

Jesse's solution from the other half of this thread, generalized:

import Queue

class ReentrantIterator(Queue.Queue):
def _init(self, iterator):
self.iterator = iterator


def _empty(self):
return False
def _get(self):

return self.iterator.next()
def _put(*ignore):
raise TypeError, "Can't put to a ReentrantIterator"


def next(self):
return self.get()
def __iter__(self):
return self

Now, x=ReentrantIterator(itertools.count()) should have all the
properties we want, I think. The locking is thanks of Queue.Queue and
its sweet implementation of the Template Method design pattern.


Alex

Kent Johnson

unread,
Mar 8, 2006, 6:05:26 AM3/8/06
to
Paul Rubin wrote:
> Hmm (untested, like above):
>
> class Synchronized:
> def __init__(self, generator):
> self.gen = generator
> self.lock = threading.Lock()
> def next(self):
> self.lock.acquire()
> try:
> yield self.gen.next()
> finally:
> self.lock.release()
>
> synchronized_counter = Synchronized(itertools.count())
>
> That isn't a general solution but can be convenient (if I didn't mess
> it up). Maybe there's a more general recipe somewhere.

This code is not allowed in Python 2.4. From PEP 255:

Restriction: A yield statement is not allowed in the try clause of a
try/finally construct. The difficulty is that there's no guarantee
the generator will ever be resumed, hence no guarantee that the finally
block will ever get executed; that's too much a violation of finally's
purpose to bear.

Even if this was allowed, ISTM the semantics might be the same as your
previous attempt - I would expect the finally to be executed after the
yield returns, meaning the lock would be held during the yield.

Python 2.5 will allow this (see PEP 342) but from the examples it seems
the finally won't execute until the yield returns.

Kent

Just

unread,
Mar 8, 2006, 6:50:15 AM3/8/06
to
In article <440eb...@newspeer2.tds.net>,
Kent Johnson <ke...@kentsjohnson.com> wrote:

> Paul Rubin wrote:
> > Hmm (untested, like above):
> >
> > class Synchronized:
> > def __init__(self, generator):
> > self.gen = generator
> > self.lock = threading.Lock()
> > def next(self):
> > self.lock.acquire()
> > try:
> > yield self.gen.next()
> > finally:
> > self.lock.release()
> >
> > synchronized_counter = Synchronized(itertools.count())
> >
> > That isn't a general solution but can be convenient (if I didn't mess
> > it up). Maybe there's a more general recipe somewhere.
>
> This code is not allowed in Python 2.4. From PEP 255:

[ snip ]

The code also doesn't make sense: .next() should *return* a value, not
yield one. Substituting "return" for "yield" might just work for the
code above.

Just

Paul Rubin

unread,
Mar 8, 2006, 6:54:28 AM3/8/06
to
ale...@yahoo.com (Alex Martelli) writes:
> Now, x=ReentrantIterator(itertools.count()) should have all the
> properties we want, I think. The locking is thanks of Queue.Queue and
> its sweet implementation of the Template Method design pattern.

That is very cool, and generally useful enough that maybe it should be
dropped into itertools. I see that Queue.get's implementation is
quite intricate (it uses three separate locks to handle some
additional cases like timeouts and non-blocking gets) but I'm not up
to trying to grok it right now. Thanks!

Bryan Olson

unread,
Mar 8, 2006, 11:41:00 AM3/8/06
to
jess....@gmail.com wrote:
> Paul wrote:
>
>> def f():
>> lock = threading.Lock()
>> i = 0
>> while True:
>> lock.acquire()
>> yield i
>> i += 1
>> lock.release()
>>
>>but it's easy to make mistakes when implementing things like that
>>(I'm not even totally confident that the above is correct).
>
>
> The main problem with this is that the yield leaves the lock locked.
> If any other thread wants to read the generator it will block.

I don't think so. The second thread will start right after the
yeild, and release the lock before acquiring it.

Here's a demo:


import threading
import time

def start_daemon(closure):
t = threading.Thread(target=closure)
t.setDaemon(True)
t.start()

def f():
lock = threading.Lock()
i = 0
while True:
lock.acquire()
yield i
i += 1
lock.release()


fgen = f()

def count3():
for _ in range(3):
print '---', fgen.next()
time.sleep(10)

start_daemon(count3)
time.sleep(1.0)
print "+++", fgen.next()


--
--Bryan

jess....@gmail.com

unread,
Mar 8, 2006, 11:16:56 PM3/8/06
to
I just noticed, if you don't define maxsize in _init(), you need to
override _full() as well:

def _full(self):
return False

cheers,
Jess

jess....@gmail.com

unread,
Mar 9, 2006, 5:41:57 AM3/9/06
to
Bryan,

You'll get the same result without the lock. I'm not sure what this
indicates. It may show that the contention on the lock and the race
condition on i aren't always problems. It may show that generators, at
least in CPython 2.4, provide thread safety for free. It does seem to
disprove my statement that, "the yield leaves the lock locked".

More than that, I don't know. When threading is involved, different
runs of the same code can yield different results. Can we be sure that
each thread starts where the last one left off? Why wouldn't a thread
just start where it had left off before? Of course, this case would
have the potential for problems that Alex talked about earlier. Why
would a generator object be any more reentrant than a function object?
Because it has a gi_frame attribute? Would generators be thread-safe
only in CPython?

I started the discussion with simpler versions of these same questions.
I'm convinced that using Queue is safe, but now I'm not convinced that
just using a generator is not safe.

cheers,
Jess

Bryan Olson

unread,
Mar 10, 2006, 8:15:20 PM3/10/06
to
jess....@gmail.com wrote:
> You'll get the same result without the lock. I'm not sure what this
> indicates. It may show that the contention on the lock and the race
> condition on i aren't always problems. It may show that generators, at
> least in CPython 2.4, provide thread safety for free. It does seem to
> disprove my statement that, "the yield leaves the lock locked".
>
> More than that, I don't know. When threading is involved, different
> runs of the same code can yield different results. Can we be sure that
> each thread starts where the last one left off? Why wouldn't a thread
> just start where it had left off before? Of course, this case would
> have the potential for problems that Alex talked about earlier. Why
> would a generator object be any more reentrant than a function object?
> Because it has a gi_frame attribute? Would generators be thread-safe
> only in CPython?

I have not found definitive answers in the Python doc. Both
generators and threads keep their own line-of-control, and
how they interact is not clear.


--
--Bryan

Paul Rubin

unread,
Mar 13, 2006, 2:36:45 AM3/13/06
to
Bryan Olson <fakea...@nowhere.org> writes:
> I have not found definitive answers in the Python doc. Both
> generators and threads keep their own line-of-control, and
> how they interact is not clear.

It looks to me like you can't have two threads in the same generator:

import threading, time

def make_gen():
lock = threading.Lock()
count = 0
delay = [0,1]
while True:
lock.acquire()

# sleep 1 sec on first iteration, then 0 seconds on next iteration
time.sleep(delay.pop())
count += 1
yield count

lock.release()

def run():
print gen.next()

gen = make_gen()

# start a thread that will lock the generator for 1 sec
threading.Thread(target=run).start()

# make sure first thread has a chance to get started
time.sleep(0.2)

# start second thread while the generator is locked
threading.Thread(target=run).start()

raises ValueError:

Python 2.3.4 (#1, Feb 2 2005, 12:11:53)
[GCC 3.4.2 20041017 (Red Hat 3.4.2-6.fc3)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> ## working on region in file /usr/tmp/python-28906xpZ...
>>> Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.3/threading.py", line 436, in __bootstrap
self.run()
File "/usr/lib/python2.3/threading.py", line 416, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/tmp/python-28906xpZ", line 18, in run
ValueError: generator already executing

1

Tim Peters

unread,
Mar 13, 2006, 3:40:19 AM3/13/06
to pytho...@python.org
[Paul Rubin]

> It looks to me like you can't have two threads in the same generator:

You can't even have one thread in a generator-iterator get away with
activating the generator-iterator while it's already active. That's
an example in PEP 255:

"""
Restriction: A generator cannot be resumed while it is actively
running:

>>> def g():
... i = me.next()
... yield i
>>> me = g()
>>> me.next()


Traceback (most recent call last):

...
File "<string>", line 2, in g
ValueError: generator already executing
"""

Same thing if more than one thread tries to do that, but perhaps
harder to see then.

To make some intuitive sense of those, note that a generator-iterator
reuses a single stack frame across resumptions. There is only once
such frame per generator-iterator, hence only (among other things)
one "program counter" per generator-iterator. It should be obvious
that multiple threads can't be allowed to muck with _the_ frame's
program counter simultaneously, and the example above is actually
subtler on that count (because nothing in non-resumable functions
prepares you for that generators make it possible for a _single_
thread to _try_ to "be in two places at the same time" inside a single
invocation of a function -- although any attempt to try to do that
with a single thread is necessarily convoluted, like the example
above, the implementation still has to prevent it).

0 new messages