In which I ask your help designing Motor's cursor API

160 views
Skip to first unread message

A. Jesse Jiryu Davis

unread,
Nov 11, 2012, 5:54:07 PM11/11/12
to python-...@googlegroups.com
Hi all, I need some advice about iterating a MotorCursor, could you check out my post and comment, either on the post or here?:

http://emptysquare.net/blog/motor-iterating-over-results/

(AsyncMongo just returns the first batch as a list and omits the rest of the results, I think MongoTor does the same. APyMongo took the interesting approach of collection.find(callback).loop(), which is like Motor's each().)

Ben Darnell

unread,
Nov 12, 2012, 12:02:02 AM11/12/12
to Tornado Mailing List
I think I'd try to structure the gen-friendly interface as an iterator of Futures (using tornado 3.0's Future support):

  for future in (yield collection.find().to_iter()):
    document = yield future
    print document

The catch is that there's kind of a delicate relationship between the iterator and the futures it yields - if the caller doesn't resolve the futures as they come out of the iterator (which must be synchronous), it can't stay ahead to know when to stop.  Using "each" as the primitive, it might look something like this (untested):

  @future_wrap
  def to_iter(self, callback):
    to_yield = collections.deque()
    results = collections.deque()

    def iter():
      while to_yield:
        yield to_yield.popleft()

    def each(document, error):
      first_time = bool(results)
      if document is not None  or error is not None:
        future = Future()
        to_yield.append(future)
        if error is not None:
          results.append(functools.partial(future.set_exception, error))
        else:
          results.append(functools.partial(future.set_result, document))
      if first_time:
        callback(iter)
      else:
        # Resolve the Futures with a one-iteration lag so we know whether the
        # iterator should return a new future or raise StopIteration.
        results.popleft()()
        
    self.each(callback=each)

One other caveat is that if the loop consuming the documents is asynchronous, the each() method might get ahead of it and you'd end up buffering more results in memory.  each() needs some sort of control-flow mechanism to prevent this.

-Ben

Shane Spencer

unread,
Nov 12, 2012, 2:25:39 PM11/12/12
to python-...@googlegroups.com
I'm not trying to threadjack and I just wanted to say that I was
peeking at the Motor functions while working on my own RPC based Mongo
connector. I thought it was kind of interesting that you were looking
for better ways to do things while I was basing my work off of yours!

Either way. It is quite challenging making clean looking DRY oriented
functions for dealing with asynchronous iterators.

One question I have related to this is will whatever solution you move
in to support chunked offloading and iteration?

MongoDB is often located through some off-site service providers that
advertise to cloud developers. Requesting one document at a time and
stepping the remote cursor 1 document at a time can become a very
lengthy stream.

I understand this only really effects offloading data since any query
that captures more information that is relayed to a web page, for
example, would be nowhere near optimal to begin with. If this were
always the opinion of a developer we would always be requesting a
flattened list.. so yeh.

Currently I'm tackling with the idea of providing an iterator that
returns a bounded result of chunked data.. this kind of goes away from
the idea that iterators return a single item at a time.. but it's
worth toying with to me.

Any idea of Motor will see 'chunky' streams in the future?

A. Jesse Jiryu Davis

unread,
Nov 12, 2012, 2:38:13 PM11/12/12
to python-...@googlegroups.com
Thanks Ben, I'm going to digest a little and get back to you; your suggestion looks really interesting.

Shane, I should've been clearer about this. MongoDB result sets are fetched in batches. The driver sends an initial "QUERY"[1] message to the MongoDB server and receives a cursorId and the first 101 documents.[2] After that, whenever the cursor runs out of documents it sends a "GETMORE" message to the server, and receives the next 4MB of documents back. The last batch is marked with a cursorId of zero to let the driver know it's done. Then the driver returns the documents from the final batch, one by one, until that runs out and it somehow tells the application that the result set is finished -- either returning None or raising StopIteration or something, I'm not sure yet.

In summary: I'm not proposing to fetch each document individually, the cursor is already grabbing documents in chunks. I'm trying to decide how best to represent the result set to the application. I'm pretty sure I want to hide the chunking from the user and make it look like a continuous stream of documents.

1. http://www.mongodb.org/display/DOCS/mongo%20wire%20protocol#MongoWireProtocol-OPQUERY

2. http://www.mongodb.org/display/DOCS/Queries+and+Cursors#QueriesandCursors-Executionofqueriesinbatches

Shane Spencer

unread,
Nov 12, 2012, 3:04:45 PM11/12/12
to python-...@googlegroups.com
On Mon, Nov 12, 2012 at 10:38 AM, A. Jesse Jiryu Davis
<je...@emptysquare.net> wrote:
> Thanks Ben, I'm going to digest a little and get back to you; your
> suggestion looks really interesting.
>
> Shane, I should've been clearer about this. MongoDB result sets are fetched
> in batches. The driver sends an initial "QUERY"[1] message to the MongoDB
> server and receives a cursorId and the first 101 documents.[2] After that,
> whenever the cursor runs out of documents it sends a "GETMORE" message to
> the server, and receives the next 4MB of documents back. The last batch is
> marked with a cursorId of zero to let the driver know it's done. Then the
> driver returns the documents from the final batch, one by one, until that
> runs out and it somehow tells the application that the result set is
> finished -- either returning None or raising StopIteration or something, I'm
> not sure yet.
>
> In summary: I'm not proposing to fetch each document individually, the
> cursor is already grabbing documents in chunks. I'm trying to decide how
> best to represent the result set to the application. I'm pretty sure I want
> to hide the chunking from the user and make it look like a continuous stream
> of documents.
>
> 1.
> http://www.mongodb.org/display/DOCS/mongo%20wire%20protocol#MongoWireProtocol-OPQUERY
>
> 2.
> http://www.mongodb.org/display/DOCS/Queries+and+Cursors#QueriesandCursors-Executionofqueriesinbatches
>
>
>

Thanks for the clarification Jesse. Now that you mention it again I
swear I've read this all before.

A. Jesse Jiryu Davis

unread,
Nov 13, 2012, 3:34:15 PM11/13/12
to python-...@googlegroups.com
So, this is cool. Let's compare the two APIs. I could simplify mine further so has_next() and next_object() simply return YieldPoints that can be yielded directly:

while (yield cursor.has_next):
    document = yield gen.Task(cursor.next_object)

And Ben's:


for future in (yield collection.find().to_iter()):
    document = yield future

Ben, I have a concern about yours. What if the user doesn't yield the future?:

futures = list((yield collection.find().to_iter()))

Then the cursor won't have a chance to pause iteration while it fetches the next batch.

A. Jesse Jiryu Davis

unread,
Nov 13, 2012, 4:05:46 PM11/13/12
to python-...@googlegroups.com
Come to think of it, if my has_next() takes care of any I/O, then next_object could just be an attribute:

while (yield cursor.has_next()):
    document = cursor.next_object

In this case I might rename it to fetch_next().

Shane Spencer

unread,
Nov 13, 2012, 4:16:14 PM11/13/12
to python-...@googlegroups.com
All broken records aside.. would it be worthwhile at all to have a
fetch_next_bulk(limit=n) while this is happening? It's a mixture of
stream processing and flattened list processing. I shall never speak
of it again if the answer is I'm loco.

Shane Spencer

unread,
Nov 13, 2012, 4:22:26 PM11/13/12
to python-...@googlegroups.com
As far as I'm concerned it's just a 'clean code' function to limit the
following problem.

I want 10 of the results for now during template processing to reduce
memory overhead. Use a for and check for has_next all the time or a
while with a counter?

I suppose cursor splitting will work just fine for the first bulk of
results.. but there is no way to easily continue on to the next bulk
set.

And now that I understand how PyMongos batch size works.. I'm simply
focusing on keeping result lists small during processing.

A. Jesse Jiryu Davis

unread,
Nov 13, 2012, 4:30:19 PM11/13/12
to python-...@googlegroups.com, sh...@bogomip.com
Not loco, Shane. You can control batch size before you start iterating:

http://emptysquare.net/motor/pymongo/api/motor/motor_cursor.html#motor.MotorCursor.batch_size

... if you don't set an explicit batch size, then the first batch will be about 101 documents, subsequent batches as large as Mongo will give you. Does that do what you want? To reiterate: Neither Ben nor I is proposing to do I/O on every iteration of the loop! We're just talking about the right API to give the caller the illusion of a continuous stream of documents, while occasionally fetching them in batches.

A. Jesse Jiryu Davis

unread,
Nov 13, 2012, 4:56:15 PM11/13/12
to python-...@googlegroups.com, sh...@bogomip.com
Sorry Ben, you already wrote: "if the caller doesn't resolve the futures as they come out of the iterator (which must be synchronous), it can't stay ahead to know when to stop." I just didn't understand you the first time I read it. I think that's a serious problem with the for-loop approach.

Based on all this feedback, I'm going to try implementing the following, and see where it takes me:

while (yield cursor.fetch_next):
    document = cursor.next_object

class FetchNext(gen.YieldPoint):
    def __init__(self, cursor):
        self.cursor = cursor
    def start(self):
        # If cursor's current batch is empty, start fetching next batch...
    def is_ready(self):
        # True unless we're in the midst of a fetch
    def get_result(self):
        # return True if cursor's current batch has more documents

class MotorCursor(object):
    def __init__(self, ...):
        self.next_object = None
    @property
    def fetch_next(self):
        return FetchNext(self)

I can't tell you all how much I've appreciated your help the last few months. Both Motor and Toro are vastly better for it.

Shane Spencer

unread,
Nov 13, 2012, 10:04:19 PM11/13/12
to python-...@googlegroups.com
I actually ended up clarifying myself a bit right after I questioned
my sanity to you. It's more of a question about find_next (returns 1
item) and find_next_batch(10) (returns 10 items, keeping code clean).
If you missed that message please read it.

A. Jesse Jiryu Davis

unread,
Nov 13, 2012, 10:21:23 PM11/13/12
to python-...@googlegroups.com
I think I understand the MotorCursor API you want, can you give an example of how that makes app code cleaner?

Ben Darnell

unread,
Nov 13, 2012, 11:57:10 PM11/13/12
to Tornado Mailing List
On Tue, Nov 13, 2012 at 4:05 PM, A. Jesse Jiryu Davis <je...@emptysquare.net> wrote:
Come to think of it, if my has_next() takes care of any I/O, then next_object could just be an attribute:

while (yield cursor.has_next()):
    document = cursor.next_object

I like this version.  The futures version is a little too magical, and the version with a yield for both has_next and next_object has a lot of unnecessary overhead for the second yield.

-Ben

A. Jesse Jiryu Davis

unread,
Nov 14, 2012, 12:25:19 AM11/14/12
to python-...@googlegroups.com
Thanks Ben. Yeah, I was worried about the extra yield-overhead, then realized I could get rid of it.

Shane, even without a code example to illustrate your idea, I'm coming around to the opinion that to_list should take a "length" argument that defaults to None (get all documents), I'll add it here soon:

http://emptysquare.net/motor/pymongo/api/motor/motor_cursor.html#motor.MotorCursor.to_list

Shane Spencer

unread,
Nov 14, 2012, 2:39:50 AM11/14/12
to python-...@googlegroups.com
I'm thinkin....

On Tue, Nov 13, 2012 at 8:25 PM, A. Jesse Jiryu Davis
<je...@emptysquare.net> wrote:
> Thanks Ben. Yeah, I was worried about the extra yield-overhead, then
> realized I could get rid of it.
>
> Shane, even without a code example to illustrate your idea, I'm coming
> around to the opinion that to_list should take a "length" argument that
> defaults to None (get all documents), I'll add it here soon:
>
> http://emptysquare.net/motor/pymongo/api/motor/motor_cursor.html#motor.MotorCursor.to_list
>

>>> @gen.engine
... def do_find():
... cursor = db.test_collection.find({'i': {'$lt': 5}}) #returns 45 items
... while cursor.alive:
... # continue from current cursor position
... for documents in (yield motor.Op(cursor.to_list,
{'length': 10})): #called 5 times
... # pre stuff
... # do something sync or async with the 10 documents ...
... # post stuff

I like the idea of quickly getting at a small set of the whole cursor
like this. It allows me to say.. run AsyncHTTPClient in pseudo
parallel using information from each document and wait for them all to
return.. or use the small set of documents in an 'in' based query to
help aggregate data.

A. Jesse Jiryu Davis

unread,
Nov 14, 2012, 7:30:27 AM11/14/12
to python-...@googlegroups.com
Yup, makes sense - I think it'll be as simple as:

for document in (yield motor.Op(cursor.to_list, 10)):

A. Jesse Jiryu Davis

unread,
Nov 14, 2012, 9:35:24 AM11/14/12
to python-...@googlegroups.com
One more question for the list. What if the MotorCursor itself were a YieldPoint, so the loop becomes:

while (yield cursor):
    document = cursor.next_object()

This is easy to implement and saves users some typing for a very common idiom . Is it too weird, though?

A. Jesse Jiryu Davis

unread,
Nov 17, 2012, 3:54:53 PM11/17/12
to python-...@googlegroups.com
I decided "yield cursor" would be too weird and I'm sticking to "yield cursor.fetch_next". It's all implemented and pushed now:

http://emptysquare.net/blog/motor-iterating-over-results-the-grand-conclusion/

Thank you!

Roey Berman

unread,
Nov 18, 2012, 1:13:20 PM11/18/12
to python-...@googlegroups.com
Hey Jesse,
I just updated to motor HEAD.

Correct me if I'm wrong, but since the change, there's no elegant API to retrieve a single result from a cursor using callbacks instead of gen.engine.

Previously I had: 

class UsersTest(tornado.testing.AsyncTestCase):
    def on_dbres(self, result, error):
        self.result = result
        self.error  = error
        self.stop()

    def test_something(self):
        ....
        cursor.next_object(callback = self.on_dbres)
        self.wait()

now I have to do something like this:

class UsersTest(tornado.testing.AsyncTestCase):
    def on_dbres(self, result, error):
        self.result = result
        self.error  = error
        self.stop()

    def on_document(self, result, error):
        self.result = result
        self.error  = error
        self.stop()
        return False

    def test_something(self):
        ....
        # option 1
        cursor.to_list(1, callback = self.on_dbres)
        self.wait()
        self.result = self.result[0]

        # option 2
        cursor.each(callback = self.on_document)
        self.wait()
        self.result = self.result

Was this your intention when you changed the API?
Maybe you can add a fetch_one method?

Thanks, Roey.

A. Jesse Jiryu Davis

unread,
Nov 18, 2012, 2:39:34 PM11/18/12
to python-...@googlegroups.com
Hey Roey. Will find_one work for you?

Roey Berman

unread,
Nov 18, 2012, 5:45:24 PM11/18/12
to python-...@googlegroups.com
Later in that test case I use the same cursor and verify that there's no other documents.
I could rewrite the test case to use find_one() and count().

I guess you're right, there's probably no real need for fetch_one().

Thanks again.

A. Jesse Jiryu Davis

unread,
Nov 18, 2012, 6:00:59 PM11/18/12
to python-...@googlegroups.com
Ah, I see. I could do a fetch_one(), since that's just a wrapper around to_list(1). But I'm concerned about making the MotorCursor API too large, with too many methods and too much interaction among them.
Reply all
Reply to author
Forward
0 new messages