Long stream ends prematurely - after one batch

74 views
Skip to first unread message

Brian Marick

unread,
Jan 13, 2014, 10:25:39 AM1/13/14
to clojure...@googlegroups.com
I'm doing a quick-and-dirty one-time conversion from one collection to create another. The first collection has 604729 entries, but only the first `batch-size` get written into the second. (Using the default, the second collection gets 256 upserts; if I change the batch size to 512, it gets 512 upserts.)

I'm wondering if this might indicate a bug in monger. Here's the code:

higher-level namespace:

(infof "Reinitializing `%s`." store/collection)
(profile/forget-everything!)
(infof "... from `%s`." old-profile/collection)
(doseq [x (old-profile/fetch)]
(store/stash x))

Namespace `old-profile` fetches in a way that I believe would produce a lazy sequence with 512 (in this case) chunks:

(->> (with-collection collection
(find {})
(sort {:timestamp 1})
(batch-size 512))
(map #(assoc % :user_id (:user %)))
(map #(dissoc % :timestamp :_id :user :signal_type :signal_created))))

Namespace `profile` upserts one at a time:

(defn stash [kvs]
(let [result (mc/upsert collection {:user_id (:user_id kvs)} {$set kvs})]
… check result for failure (which doesn't happen)


---

I shall be telling this with a sigh
Somewhere ages and ages hence:
One road passed through a hero page, and I –
I closed my browser page, confused.
And that has made all the difference.

– Robert Frost, "The Link Not Clicked" (2013)



Brian Marick

unread,
Jan 13, 2014, 10:33:05 AM1/13/14
to clojure...@googlegroups.com
I should note that the `old-profile` collection does have an index on :timestamp, so I don't *think* it's the same problem as "QueryDSL: iterate over batches." below.

Also, I'm seeing this with both Monger 1.6.0 and 1.7.0

Brian Marick

unread,
Jan 13, 2014, 1:08:42 PM1/13/14
to clojure...@googlegroups.com
I guess that, with that much data, I have to paginate.

On Jan 13, 2014, at 9:33 AM, Brian Marick <br...@getset.com> wrote:
> On Monday, January 13, 2014 9:25:39 AM UTC-6, Brian Marick wrote:
> I'm doing a quick-and-dirty one-time conversion from one collection to create another. The first collection has 604729 entries, but only the first `batch-size` get written into the second. (Using the default, the second collection gets 256 upserts; if I change the batch size to 512, it gets 512 upserts.)


Sean Corfield

unread,
Jan 15, 2014, 11:56:39 AM1/15/14
to clojure...@googlegroups.com
Looking at `exec` (which backs the `with-collection` macro) and also at `find-maps` and a few others, it looks like Monger uses `with-open` on the cursor over `map` to process elements. That seems likely to close the cursor before the sequence is fully realized?

You probably need to use `collection/find` and manage the sorting etc yourself so you can process the entire collection (untested):

(with-open [cursor (doto (collection/find collection {})
(.batchSize 512)
(.sort (convert/to-db-object {:timestamp 1})))]
(doseq [item cursor]
(process-it item)))

Sean
signature.asc

Brian Marick

unread,
Jan 15, 2014, 12:55:56 PM1/15/14
to clojure...@googlegroups.com

On Jan 15, 2014, at 10:56 AM, Sean Corfield <se...@corfield.org> wrote:

> You probably need to use `collection/find` and manage the sorting etc yourself so you can process the entire collection (untested):
>
> (with-open [cursor (doto (collection/find collection {})
> (.batchSize 512)
> (.sort (convert/to-db-object {:timestamp 1})))]

I was able to make it work with this:

(->> (with-collection collection
(find {})
(sort {:timestamp 1})
(paginate :page page :per-page our-batch-size)
(batch-size our-batch-size))
(map filters/old-profile-entry)))

… where this code is called with new `page` values until it returns an empty sequence. I was inspired to do this by reading in some Mongo documentation that sorting won't work, even with an index, if <some resource> exceeds 32M.

Sean Corfield

unread,
Jan 15, 2014, 6:24:19 PM1/15/14
to clojure...@googlegroups.com
True, in your case, trying to process 6M documents, you wouldn't be able to sort the full result set.

I am concerned about `with-collection` (and others) closing the cursor prematurely when using `map`. Perhaps MK can chime in here?

Sean
signature.asc

Michael Klishin

unread,
Jan 16, 2014, 3:47:06 AM1/16/14
to Monger, a Clojure MongoDB driver
2014/1/16 Sean Corfield <se...@corfield.org>

I am concerned about `with-collection` (and others) closing the cursor prematurely when using `map`. Perhaps MK can chime in here?

Not closing the cursor is a problem for more people.

There is https://github.com/michaelklishin/monger/pull/64 but the complexity of that solution is
enormous.
--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin

Sean Corfield

unread,
Mar 1, 2014, 7:35:34 PM3/1/14
to clojure...@googlegroups.com
On Jan 16, 2014, at 12:47 AM, Michael Klishin <michael....@gmail.com> wrote:
Not closing the cursor is a problem for more people.

There is https://github.com/michaelklishin/monger/pull/64 but the complexity of that solution is
enormous.

Indeed.

Would a simpler solution be to provide a hook for a function to be added to the with-collection query structure that would be performed on the result of the map call? For backward compatibility, default to no call, but passing doall would ensure the result set is fully realized before the cursor is closed I believe? That's similar to what is done in clojure.java.jdbc (it has :row-fn and :result-set-fn for flexible processing of ResultSet objects).

If that seems reasonable, I'd be happy to submit a PR...

Sean Corfield -- (904) 302-SEAN
An Architect's View -- http://corfield.org/

"Perfection is the enemy of the good."
-- Gustave Flaubert, French realist novelist (1821-1880)



signature.asc

Michael Klishin

unread,
Mar 1, 2014, 9:57:21 PM3/1/14
to Monger, a Clojure MongoDB driver

2014-03-02 4:35 GMT+04:00 Sean Corfield <se...@corfield.org>:
Would a simpler solution be to provide a hook for a function to be added to the with-collection query structure that would be performed on the result of the map call? For backward compatibility, default to no call, but passing doall would ensure the result set is fully realized before the cursor is closed I believe? That's similar to what is done in clojure.java.jdbc (it has :row-fn and :result-set-fn for flexible processing of ResultSet objects).

If that seems reasonable, I'd be happy to submit a PR...

Good idea, please do!

Sean Corfield

unread,
Mar 2, 2014, 1:41:33 AM3/2/14
to clojure...@googlegroups.com
Cool. Will try to get to that tomorrow or Monday!
signature.asc

Sean Corfield

unread,
Mar 4, 2014, 6:50:02 PM3/4/14
to clojure...@googlegroups.com
I spent some time trying to create a test case to reproduce this (on a small enough collection that it was reasonable to run as part of the test suite) but I have not been able to...

At one point I created a 1M document collection and copied it to a new collection using almost exactly the same logic as your code and it just worked, no matter what I set the block size to, and even when I made sure the documents themselves were a reasonable size (so I'd hit the 32M limit).

I'm a bit puzzled by that and not sure what to try next.

Sean
signature.asc
Reply all
Reply to author
Forward
0 new messages