Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Turning f(callback) into a generator

62 views
Skip to first unread message

Peter Otten

unread,
Dec 3, 2003, 12:48:41 PM12/3/03
to
It's easy to write a function that wraps a generator and provides a
callback. E. g.:

import os, sys

def walk(path, visit):
""" Emulate os.path.walk() (simplified) using os.walk()"""
for dir, folders, files in os.walk(path):
visit(dir, folders + files)


if __name__ == "__main__":
walk(".", lambda d, f: sys.stdout.write(d + "\n"))


However, I did not succeed in turning the old os.path.walk(), i. e. a
function taking a callback, into a generator. Is there a general way to do
it without having to store all intermediate results first?


Peter

PS. No, I don't have a use case. Threads welcome if all else fails :-)

Diez B. Roggisch

unread,
Dec 3, 2003, 3:50:10 PM12/3/03
to
Hi,

> However, I did not succeed in turning the old os.path.walk(), i. e. a
> function taking a callback, into a generator. Is there a general way to do
> it without having to store all intermediate results first?

This works for me:

import os.path

def path_gen(start):
res = []
def cb(r, dir, names):
for n in names:
r.append(n)

os.path.walk(start, cb, res)
for n in res:
yield n


g = path_gen("/etc")

for n in g:
print n


Diez

Diez B. Roggisch

unread,
Dec 3, 2003, 3:53:00 PM12/3/03
to
> def path_gen(start):
> res = []
> def cb(r, dir, names):
> for n in names:
> r.append(n)
>
> os.path.walk(start, cb, res)
> for n in res:
> yield n
>
>
> g = path_gen("/etc")
>
> for n in g:
> print n

Just found out that lists support extend, which allows the ugly loop for
appending names in cb to be written this way:

r.extend(names)

Diez

Peter Otten

unread,
Dec 3, 2003, 4:53:22 PM12/3/03
to
Diez B. Roggisch wrote:

>> However, I did not succeed in turning the old os.path.walk(), i. e. a
>> function taking a callback, into a generator. Is there a general way to
>> do it without having to store all intermediate results first?
>
> This works for me:
>
> import os.path
>
> def path_gen(start):
> res = []
> def cb(r, dir, names):
> for n in names:
> r.append(n)
>
> os.path.walk(start, cb, res)

At this point, you have stored all results in res, and thus did not play by
the rules :-)

> for n in res:
> yield n
>
>
> g = path_gen("/etc")
>
> for n in g:
> print n

Both os.path.walk() and os.walk() basically need memory for the names in
*one* directory; when you take os.walk() to model os.path.walk() that
doesn't change, but the other way round - bang, memory usage explosion.

This is a strange asymmetry, and I was wondering if I've overlooked a simple
way to transform a callback into a generator, but I now tend to the
assumption that you *need* threads: One thread with the callback puts names
or name lists into the queue until it's full (some arbitrary limit that
also determines the total memory needed), another thread (the generator)
consumes names from the queue.

Peter

Bengt Richter

unread,
Dec 3, 2003, 5:41:38 PM12/3/03
to

I suspect that's what is necessary currently, until we get a yield that can suspend a
whole stack of frames at a yield inside nested calls to functions. Then it would
just be a matter of putting a yield in a callback routine and starting the
walk from the base generator-making function/method/whatever.

Maybe deep generators could be created via an __iter__ method of the function type
as an alternative/extension to has-yield-in-function-code-body magic.
Exit from the generator wouldn't happen until you exited the base frame.
Yield in a nested function call would just suspend right there.
Easier said than done, of course ;-)

Regards,
Bengt Richter

Diez B. Roggisch

unread,
Dec 3, 2003, 6:26:13 PM12/3/03
to
Hi,

> This is a strange asymmetry, and I was wondering if I've overlooked a
> simple way to transform a callback into a generator, but I now tend to the
> assumption that you *need* threads: One thread with the callback puts
> names or name lists into the queue until it's full (some arbitrary limit
> that also determines the total memory needed), another thread (the
> generator) consumes names from the queue.

I also thought about that - its the only thing that allows for real lazyness
- you can wait in the callback until the generator clears a semaphore. But
if the context-changes are worth the effort is questionable.

Diez

Jimmy Retzlaff

unread,
Dec 3, 2003, 9:07:08 PM12/3/03
to pytho...@python.org
Peter Otten wrote:
> It's easy to write a function that wraps a generator and provides a
> callback. E. g.:
>
> import os, sys
>
> def walk(path, visit):
> """ Emulate os.path.walk() (simplified) using os.walk()"""
> for dir, folders, files in os.walk(path):
> visit(dir, folders + files)
>
>
> if __name__ == "__main__":
> walk(".", lambda d, f: sys.stdout.write(d + "\n"))
>
>
> However, I did not succeed in turning the old os.path.walk(), i. e. a
> function taking a callback, into a generator. Is there a general way
to do
> it without having to store all intermediate results first?
>
>
> Peter
>
> PS. No, I don't have a use case. Threads welcome if all else fails :-)

I do have a use case. I've wrapped a C library for extracting files from
compressed archives (http://www.averdevelopment.com/python/). The C
library uses a callback, but exposing a generator in Python is much more
natural in my case. Of course, if you can change the code making the
callbacks you can typically just replace the call to the callback
function with a yield statement, but that doesn't work if you don't have
access to the source (or don't want the source changed).

The reason I created my library is that my Python program reads in,
line-by-line, 100GB of data that has been compressed into a 2GB archive.
So, obviously I can't read it all in and then start yielding and I
really don't want to expand it to a disk file and then read it in, I
want to work on it as it is decompressed.

My solution used threads. I'd love to know if someone comes up with a
solution not involving threads. Here's a simple example of my
thread-based approach:


import Queue
import threading

def counter(length, callback):
for i in range(length):
callback(i)

def counterWrapper(length):
"""Wrap counter(...) as a generator using threads"""

# Increase maxsize to allow more read-ahead. A value less
# than 1 would allow arbitray read-ahead.
queue = Queue.Queue(maxsize=1)

# More bookkeeping would make a sentinel unnecessary, but if
# you know of a value that can't be validly passed into the
# callback, then it's simpler to just use a sentinel.
sentinel = None

def callback(i):
queue.put(i)

def launcher():
counter(length, callback)
queue.put(sentinel)

threading.Thread(target=launcher).start()

while True:
value = queue.get()
if value != sentinel:
yield value
else:
break

if __name__ == "__main__":
# callback
def callback(i):
print i
counter(10, callback)

print

# generator
for i in counterWrapper(10):
print i


Jimmy


Peter Otten

unread,
Dec 4, 2003, 4:51:07 AM12/4/03
to
Diez B. Roggisch wrote:

There is another aspect I forgot to mention. Your approach would only start
to yield results after all results are found, i. e. in the example you
would have to scan the whole harddisk even when the first visited file
might have been the one you want.
So I think it is best to either not change the callback approach or use
threads as outlined by Jimmy Retzlaff.

Peter

Peter Otten

unread,
Dec 4, 2003, 7:55:53 AM12/4/03
to
Jimmy Retzlaff wrote:

> Peter Otten wrote:

[...]

>> However, I did not succeed in turning the old os.path.walk(), i. e. a
>> function taking a callback, into a generator. Is there a general way
> to do
>> it without having to store all intermediate results first?
>>
>>
>> Peter
>>
>> PS. No, I don't have a use case. Threads welcome if all else fails :-)
>
> I do have a use case. I've wrapped a C library for extracting files from
> compressed archives (http://www.averdevelopment.com/python/). The C
> library uses a callback, but exposing a generator in Python is much more
> natural in my case. Of course, if you can change the code making the
> callbacks you can typically just replace the call to the callback
> function with a yield statement, but that doesn't work if you don't have
> access to the source (or don't want the source changed).
>
> The reason I created my library is that my Python program reads in,
> line-by-line, 100GB of data that has been compressed into a 2GB archive.
> So, obviously I can't read it all in and then start yielding and I
> really don't want to expand it to a disk file and then read it in, I
> want to work on it as it is decompressed.
>
> My solution used threads. I'd love to know if someone comes up with a
> solution not involving threads. Here's a simple example of my
> thread-based approach:

[Nice example of a thread-based generator, thanks]

So far, no one has come up with a solution that avoids threads - short of
Bengt Richter's ideas on changing the language that is.

Below is my attempt on generalizing your code to turn any visit(callback)
into a generator (not tested beyond the __main__ stuff):

import Queue
import threading

# mark the position of the callback function in the argument list of visit()
CALLBACK = object()

def cbwrapper(visit, queuesize, *args, **kwd):
"""Wrap visit(..., callback, ...) as a generator using threads"""

queue = Queue.Queue(maxsize=queuesize)

sentinel = object()

# XXX should keyword args be allowed?
# XXX should the len(args) == 1 case be tweaked to
# return args[0] instead of a tuple?
def callback(*args):
queue.put(args)

# replace the CALLBACK placeholder with the actual callback function
args = list(args)
try:
args[args.index(CALLBACK)] = callback
except ValueError:
for key, value in kwd.iteritems():
if value is CALLBACK:
kwd[key] = callback
break
else:
# XXX raise Exception("Don't know where to put callback")
# default to last positional for now
args.append(callback)

def launcher():
# XXX what if visit throws an exception?
try:
visit(*args, **kwd)
except Exception, e:
queue.put(e)
else:
queue.put(sentinel)

threading.Thread(target=launcher).start()

while True:
value = queue.get()

if value is sentinel:
break
elif isinstance(value, Exception):
raise value
yield value

if __name__ == "__main__":

def counter(length, callback):
for i in range(length):
callback(i)

def multiply(first, callback, second):
for i in first:
for k in second:
callback(i, k, i*k)
def divide(first, second, callback):
for i in first:
for k in second:
callback(i, k, i/k)

for i, in cbwrapper(counter, 1, 10):
print i,
print
for i, k, ik in cbwrapper(multiply, 1, range(3), CALLBACK, range(2)):
print "%d * %d = %d" % (i, k, ik)
for i, k, ik in cbwrapper(divide, 1, range(3), range(2)[::-1]):
print "%d / %d = %d" % (i, k, ik)

The question that has come up writing the above, how you would handle
exceptions in the thread? I put them in the queue and reraise them in the
main thread, but I'm not feeling very confident about it...

Peter

Jimmy Retzlaff

unread,
Dec 4, 2003, 8:09:21 PM12/4/03
to pytho...@python.org
Peter Otten wrote:

[Nice generalization of a thread-based generator, thanks]

> The question that has come up writing the above, how you would handle
> exceptions in the thread? I put them in the queue and reraise them in
the
> main thread, but I'm not feeling very confident about it...

Perhaps there should be a second queue for an exception and the
sentinel. Then the main loop could look something like this (untested):

while terminationQueue.empty() or not queue.empty():
# The timeout is needed because we may have arrived here after
# the last value was placed in the queue but before the sentinel
# or an exception was placed in the terminationQueue.
yield queue.get(timeout=0.1)

terminationValue = terminationQueue.get()
if terminationValue is not sentinel:
raise terminationValue

In this case the launcher function would put the sentinel in the
terminationQueue instead of the data queue. The idea of the loop
condition above is to yield everything that came through before the
exception/sentinel, then re-raise any exception if that was the reason
for terminating. This would hopefully help clarify where the exception
occurred (i.e., on which element in the iteration).

The separate queue also eliminates the problem of someone somehow
finding a way of injecting the sentinel into the data stream. But this
does create the need for the timeout (which doesn't work in 2.2). I
guess it's a judgment call as to whether the sentinel should be placed
in one queue or the other (or both).

Your generalization could make a nice Python Cookbook recipe if you feel
like putting it together. At least two of us have needed this sort of
thing.

Jimmy


Peter Otten

unread,
Dec 5, 2003, 10:57:47 AM12/5/03
to
Jimmy Retzlaff wrote:

> Perhaps there should be a second queue for an exception and the
> sentinel. Then the main loop could look something like this (untested):
>
> while terminationQueue.empty() or not queue.empty():
> # The timeout is needed because we may have arrived here after
> # the last value was placed in the queue but before the sentinel
> # or an exception was placed in the terminationQueue.
> yield queue.get(timeout=0.1)
>
> terminationValue = terminationQueue.get()
> if terminationValue is not sentinel:
> raise terminationValue
>
> In this case the launcher function would put the sentinel in the
> terminationQueue instead of the data queue. The idea of the loop
> condition above is to yield everything that came through before the
> exception/sentinel, then re-raise any exception if that was the reason
> for terminating. This would hopefully help clarify where the exception
> occurred (i.e., on which element in the iteration).
>
> The separate queue also eliminates the problem of someone somehow
> finding a way of injecting the sentinel into the data stream. But this
> does create the need for the timeout (which doesn't work in 2.2). I
> guess it's a judgment call as to whether the sentinel should be placed
> in one queue or the other (or both).

I'd rather not go with the above approach. At first glance it looks more
complicated than the original design. Also, I'm not sure if an item in the
terminationQueue could become visible before all items in the data queue.
However, seeing

raise terminationValue

was useful in that I noticed that you do not nead a special sentinel -
instead, just put a StopIteration exception into the queue.

> Your generalization could make a nice Python Cookbook recipe if you feel
> like putting it together. At least two of us have needed this sort of
> thing.

Since I posted the code, I've found a serious bug. Unless you run the
generator to exhaustion, you end up with a thread where a callback waits
forever to insert a tuple into the queue. So some two-way communication is
needed here to tell the callback "Stop it, we're done", and I've not yet
figured out how to do that. An intermediate fix is a "high" timeout and
simply returning from launcher() on a Queue.Full exception.

When I can advance code quality to prime time, I will probably turn it into
a recipe - good idea :-)

In the convenience department, this tiny class will make usage even more
intuitive:

class CbWrapper:
def __init__(self, visit, queuesize=1, callbackpos=None):
self._visit = visit
self.queuesize = queuesize
self.callbackpos = callbackpos
def __call__(self, *args, **kwd):
if self.callbackpos is not None:
args = args[:self.callbackpos] + (CALLBACK,) +
args[self.callbackpos:]
return cbwrapper(self._visit, self.queuesize, *args, **kwd)
visit = __call__

# make os.path.walk() almost as easy to use as os.walk()
import os
mywalk = CbWrapper(os.path.walk, callbackpos=1)
for arg, dirname, names in mywalk(".", None):
print "directory \'%s\' contains %d items" % (dirname, len(names))

Perhaps some extension of the class approach will also help with the
dangling thread problem, killing it at least when the instance goes out of
scope.

Peter

Jeff Epler

unread,
Dec 5, 2003, 11:37:47 AM12/5/03
to Jimmy Retzlaff, pytho...@python.org
On Wed, Dec 03, 2003 at 06:07:08PM -0800, Jimmy Retzlaff wrote:
> My solution used threads. I'd love to know if someone comes up with a
> solution not involving threads. Here's a simple example of my
> thread-based approach:
[...]

In standard Python, this is the only approach that can work. It's
possible that Stackless Python might provide a solution that doesn't
involve OS threads, but I don't have any actual experience with
Stackless.

One problem with your code is the use of a sentinel value. Another
poster suggested using multiple queues and a timeout to return
exceptions or end-of-queue. Instead, you should use a queue but insert
easily distinguishable items in it. For instance:
None: end of iterator, raise StopIteration
(0, a, b): exception in f, raise a, None, b (re-raise exception)
(1, blah): callback got blah, return it from next()


import sys, Queue, threading

class CallbackGenerator:
def __init__(self, func, pre=(), post=()):
self.func = func
if type(pre) is not tuple: pre = (pre,)
if type(post) is not tuple: post = (post,)
self.args = pre + (self.cb,) + post
self.queue = Queue.Queue(maxsize=1)
self.thread = threading.Thread(target=self.go)
self.thread.start()

def __iter__(self): return self

def next(self):
item = self.queue.get()
if item is None:
raise StopIteration
if item[0] == 0:
raise item[1], None, item[2]
else:
return item[1]

def go(self):
try:
self.func(*self.args)
except:
info = sys.exc_info()
self.queue.put((0, info[1], info[2]))
else:
self.queue.put(None)

def cb(self, *args):
self.queue.put((1, args))

# Example 1: os.path.walk -> generator
import os
for i in CallbackGenerator(os.path.walk, "/tmp", None):
print i

# Example 2: Prints 1, 2 then shows a traceback
def f(cb):
cb(1)
cb(2)
1/0

for i in CallbackGenerator(f):
print i


Alan Kennedy

unread,
Dec 5, 2003, 12:11:45 PM12/5/03
to
[Peter Otten]

> So far, no one has come up with a solution that avoids threads - short of
> Bengt Richter's ideas on changing the language that is.

Sorry I'm late into this thread.

Peter, you may find the following message from Stephen Taschuk
interesting. It is an approach to the producer/consumer problem,
through the use of coroutines. Only the bottom half of his post will
be of interest. The top half relates to another question I was asking
him at the time.

http://groups.google.com/groups?selm=mailman.1054317657.5272.python-list%40python.org&rnum=8

I think it might be something closer to what you're looking for.

regards,

--
alan kennedy
------------------------------------------------------
check http headers here: http://xhaus.com/headers
email alan: http://xhaus.com/contact/alan

Peter Otten

unread,
Dec 5, 2003, 12:49:08 PM12/5/03
to
Alan Kennedy wrote:

> Peter, you may find the following message from Stephen Taschuk
> interesting. It is an approach to the producer/consumer problem,
> through the use of coroutines. Only the bottom half of his post will
> be of interest. The top half relates to another question I was asking
> him at the time.
>
>
http://groups.google.com/groups?selm=mailman.1054317657.5272.python-list%40python.org&rnum=8
>
> I think it might be something closer to what you're looking for.

Very interesting indeed, thanks.
At first glance I think it will not solve the particular problem, as all
participants (read/write/consumer/producer) are generators, which is not an
option here, as I want to "tunnel" data from a callback inside code that
assumes it is calling a normal function.
On the other hand, although I always thought of the callback as the
producer, there might be a way to put the *dispatcher* into the callback -
and then bingo.
So I'll definitely have a closer look at this promising approach.

Peter

Peter Otten

unread,
Dec 5, 2003, 1:42:22 PM12/5/03
to
Jeff Epler wrote:

Your code looks far more concise than mine.
However, it suffers from the same problem:

# nasty example
maxRepeat = 0


for i in CallbackGenerator(os.path.walk, "/tmp", None):

if maxRepeat == 0:
break
print i
maxRepeat -= 1

How do you handle the case where the generator is not run to exhaustion?

Peter

Jeff Epler

unread,
Dec 5, 2003, 3:15:39 PM12/5/03
to Peter Otten, pytho...@python.org
On Fri, Dec 05, 2003 at 07:42:22PM +0100, Peter Otten wrote:
> How do you handle the case where the generator is not run to exhaustion?

I didn't think about it or see your other message before I posted my code.

I suppose you'd have to add some sort of communication in the opposite
direction that causes cb to raise an exception instead of putting
something in the queue. Something like:

def delete(self):
self.queue = None

def __del__(self):
if self.queue is not None: self.delete()

def cb(self):
if self.queue is None: raise AppropriateException
...

def go(self):
try:
...
except AppropriateException:
return # exit thread

but this isn't quite right because delete must deal with a previous cb
that is blocked, as well as races between 'delete' changing queue and cb
using it.

What you'd really want is a Queue subclass with a .shutdown() method:
after a queue has been shutdown, any put() (including a blocked put in
another thread) fails immediately with an exception, and get() (including
a blocked get in another thread) fails immediately if the queue is empty.

... and __del__ still won't work because there are live references to
self in the callback-thread. You'll need a container class that the
thread doesn't reference whose __del__ takes care of doing the queue
shutdown.

Jeff

0 new messages