Re: MapReduce sharded output

640 views
Skip to first unread message

Christian Csar

unread,
Aug 13, 2012, 5:07:25 PM8/13/12
to mongod...@googlegroups.com
Anyone have advice on at least one of these questions?

William Z

unread,
Aug 13, 2012, 7:43:50 PM8/13/12
to mongod...@googlegroups.com
Hi Christian!

To answer your questions:

1) In MongoDB version 2.0, a MapReduce() operation with sharded input works as follows:

Each node in the cluster will perform a map/reduce operation on the data that it owns.  When that work is complete and the nodes have their individual results, each node will send the result of its work to the coordinating 'mongos' process, which will do any remaining reduce and finalize processes, thereby producing the final result.

This is independent of how the results are output: this will be the same if the results are returned inline, saved to a sharded collection, or saved to an unsharded collection.

2) In MongoDB version 2.0, a MapReduce() operation with output to a non-sharded collection works as follows:

Every sharded collection has a primary shard. The 'mongos' process will identify the primary shard of the input collection to the map/reduce job.  It will then create an output collection on that shard, and will send the entire results of the map/reduce job to that node.  The resulting collection will not be sharded.


3) In MongoDB version 2.0, a MapReduce() operation with output to a sharded collection works as follows:

 - The 'mongos' process coordinating the map/reduce will create a sharded collection on the primary node, if it does not already exist.  This is almost the same as for the non-sharded case: the only difference is that the collection is marked as sharded.

 -  The shard key of the output collection will be the key that was generated in the emit() phase.  For example, if the emit phase looks like this:
    > m = function() { emit(this.user_id, 1); }
then the shard key will be the user_id field from the input collection.  Note that this is NOT necessarily the shard key that was used for sharding the input collection.

 - The 'mongos' process will then insert all of the results from the map/reduce command into that collection, performing any remaining reduce() or finalize() operations as necessary

 - Note that even though the output collection is sharded, due to a limitation in MongoDB version 2.0, the insertions into this collection will not be automatically split, nor will they be automatically balanced.  This means that all output will initially be directed to the primary shard.

See https://jira.mongodb.org/browse/SERVER-3627 for further details of this limitation, which has been fixed for the upcoming release of MongoDB 2.2

 - Once the MapReduce() command has completed, then the normal MongoDB splitting and balancing algorithms will take over.

 - As a workaround, you can pre-create the output collection and pre-split the chunks in that collection.  If the collection is pre-split, then the output will be directed to the appropriate shard.  See here for more information about pre-splitting:

     - http://www.mongodb.org/display/DOCS/Splitting+Shard+Chunks

4) This process has been greatly improved for MongoDB 2.2, both for sharded input collections and sharded output collections. 

5) In MongoDB version 2.2, a MapReduce() operation on a sharded collection with sharded output works as follows:

 - Each node in the cluster will perform a map/reduce operation on the data that it owns
 - The emit() operation will generate an _id for the temporary output collection, which is the key value from the emit.  (This is unchanged from version 2.0.)
 - The final output collection will also be sharded on the same _id value, which is NOT necessarily the _id used by the input collection.  This shard key is used to distribute the output among the shards in the cluster
 - Each node in the cluster will distribute the results of it's individual map/reduce operations to the shard that owns the final data. 
 - Each shard that owns the final data will accept it, perform any remaining reduce() operations, and then perform the finalize()

6) In MongoDB 2.2, if the output collection is not sharded, then it is created on the primary shard of the input collection, and that shard does all of the work of the remaining reduce() and finalize().

Let me know if you have further questions.

 -William

Christian Csar

unread,
Aug 13, 2012, 8:23:01 PM8/13/12
to mongod...@googlegroups.com
William,
   Thanks for replying. There are still a few aspects I want to be sure I understand correctly.

1. If the sharded destination collection already exists will the shard key of that collection be respected or will it get changed to _id? For instance if the documents emitted by Reduce have a field "generatedShardKey" (name chosen for illustrative purposes) and the collection is currently set to be sharded on "generatedShardKey" will it still be sharded by "generatedShardKey" afterwards? I assume it works this way for 'merge', but I would be surprised if it did for 'replace'. 'reduce' I could see going either way depending on how it is implemented.

2. The example of "m = function() { emit(this.user_id, 1); }" makes sense, this.user_id is used as _id in the new collection. But if the _id is more complex such as in the example of counting unique users by day with map reduce, how does the sharding work? In the example _id is emitted as an object with the fields day and user_id in that order, would they get separated among the shards first by the value of the first field and then go through the fields in order, since the field names would be the same they would have no effect other than to slow down the comparisons? Or would the fields in the object end up getting sorted alphabetically? If order is respected then it would make sense to put in the field desired as a generated shard key first in the object used for _id.

Question 2 is less likely to be relevant if the initial sharding of the output collection is respected for the merge and reduce cases in question 1.

Christian

Christian Csar

unread,
Aug 21, 2012, 2:49:09 PM8/21/12
to mongod...@googlegroups.com
An additional question, the note on permanent collections says that a temporary collection gets renamed to the permanent one atomically. Does this mean that only one Map Reduce can be going on at once per output collection? How does this end up  working in practice? If a map reduce is ongoing, and another is issued will the output of the one that finishes first get lost after the second one completes? Since the two jobs might be running with different queries, even if they have the same map and reduce, they can touch entirely different output objects?

I am trying to come up with a way to use incremental map reduce, but the concurrency model is unclear.

Ways I could see map reduce with permanent collections working (given the documentation) include:
a. Only one can run per output collection at at time and changes to the output collection are delayed until the job completes (operations on the output collection are serialized). This does not lose any data. (Additional  map reduces might be serialized, or simply error out)
b. Multiple map reduce jobs can run at the same time, the last one wins completely replacing the previous state of the output collection (ie state of output collection is simply that at time of input plus the results of the map reduce). This loses all work that wrote to the collection in the meantime.
c. Multiple map reduce jobs can run at once and rather than a simple atomic collection rename there is an atomic collection merge (ie I've read the docs wrong).
d. the note is completely wrong and multiple map reduce jobs on the same output collection will end up with the output as some sort of mixture between two concurrent jobs with document level atomicity (ie you might end up double countng some values)

Given the available apparent options I'm not sure if MongoDB's incremental map reduce is usable outside of operations that affect the entire output collection using external concurrency control, which would be sad as the concept is exactly what I want and it looks like some of the Hadoop based options would be able to do it. d could even work if I came up with a way to handle versioning.

Admittedly I might be able to use a combination of batched map reduces to update a permanent collection and in between the batched runs use incremental ones outputting to temporary collections, but even if there was a way to make it work, it would be less than ideal.

Any additional guidance, including pointers to additional documentation, would be appreciated, as well as any answers to the two questions from the previous post.

Christian Csar

unread,
Aug 21, 2012, 4:10:31 PM8/21/12
to mongod...@googlegroups.com
So in essence I'm looking for clarification on
" - The 'mongos' process will then insert all of the results from the map/reduce command into that collection, performing any remaining reduce() or finalize() operations as necessary"
from William's previous post as the note on permanent collections suggests a different behavior.

William Z

unread,
Aug 23, 2012, 1:13:06 PM8/23/12
to mongod...@googlegroups.com

Hi Christian!

To address your questions:

A) Please be aware that the output collection will always contain two, and only two, keys.  These will be '_id' and 'value'.  The contents of these keys will depend on what's in your 'map()' and reduce()' functions, and can be arbitrarily complex sub-documents.  Nonetheless, there will always be those two keys, and only those two keys, in any collection output from mapReduce() in MongoDB.

B) MongoDB can sort by, index, and shard on sub-documents, not just simple key values.  When it does so, it uses a multi-key sort: the first key in the document is the major sort key, the second key is the second sort key, and so on.  See below for an example.

C) The contents of the '_id' field can be an arbitrarily complex document.  MongoDB can index and shard a key that contains a sub-document in the same way it does for a key that contains a single value.

Here's an example:

SOURCE.findOne()

{
    "_id" : ObjectId("503523c029973f7b7db9adc9"),
    "url" : "http://site6212.com/page3790.html",
    "user_id" : "user6710",
    "date" : ISODate("2012-08-20T02:13:25.759Z")
}


map = function() {
  emit( {user: this.user_id, day: this.date.getDay()} , {count: 1});
}

reduce = function(key, values) {
  var count = 0;

  values.forEach(function(v) {
    count += v['count'];
  });

  return {count: count};
};

SOURCE.mapReduce( map, reduce, { out: 'counts' });

db.counts.find().sort({_id:1}).limit(10);

{ "_id" : { "user" : "user1156", "day" : 0 }, "value" : { "count" : 3 } }
{ "_id" : { "user" : "user1156", "day" : 1 }, "value" : { "count" : 5 } }
{ "_id" : { "user" : "user1156", "day" : 2 }, "value" : { "count" : 3 } }
{ "_id" : { "user" : "user1156", "day" : 3 }, "value" : { "count" : 1 } }
{ "_id" : { "user" : "user1365", "day" : 0 }, "value" : { "count" : 6 } }
{ "_id" : { "user" : "user1365", "day" : 1 }, "value" : { "count" : 4 } }
{ "_id" : { "user" : "user1365", "day" : 2 }, "value" : { "count" : 4 } }
{ "_id" : { "user" : "user4924", "day" : 0 }, "value" : { "count" : 2 } }
{ "_id" : { "user" : "user4924", "day" : 1 }, "value" : { "count" : 8 } }
{ "_id" : { "user" : "user4924", "day" : 2 }, "value" : { "count" : 2 } }

Note how the values of both '_id' and 'value' are sub-documents.  Also note how the '_id' field is sorted, with the 'user' key as the major sort and 'day' as the minor sort.

D) In order for MongoDB to shard a collection, there must be an index on the shard key.  If you wanted to shard the 'counts' collection on 'value.count', you would have to
  - pre-create the collection
  - define an index on 'value.count'
  - shard the collection on 'value.count'

While this will work in MongoDB version 2.0, this will *NOT* work in MongoDB version 2.2 and later.  In those versions, you *must* have the output collection sharded by the '_id' field -- the algorithm used for distributing the work among the shards will break if you do not. 

Accordingly, I recommend that you not do this for version 2.0 either -- there's no reason to force yourself to do double work.

E) Please be aware that MongoDB cannot insert a document into a sharded collection if that document does not contain the shard key.  So, for example, if you sharded the output collection using 'value.sum', you would not be able to output

F)  If the output collection already exists, and is pre-sharded on something other than "_id", then MongoDB will continue to use the existing shard key and chunk information for that collection. 

Note again that sharding the output collection on anything other than '_id' is not recommended.


Let me know if you have further questions.

  -William


William Z

unread,
Aug 23, 2012, 1:15:31 PM8/23/12
to mongod...@googlegroups.com


Hi Christian!

To address your questions:

The behavior of multiple simultaneous mapReduce() commands in MongoDB depends on whether the output mode is set to 'replace', 'merge', 'reduce', or 'inline'.

A) If the output mode is set to 'replace', then the results of the mapReduce() command are saved in a temporary collection.  Multiple mapReduce() commands can run at the same time.  At the end of the mapReduce() command, the temporary collection is atomically renamed to the name of the target output collection.  If there are multiple mapReduce() commands that are running at the same time, the last one to complete wins, and overwrites the results of the other commands.

If the output collection is sharded, then the behavior is the same, with one, possibly critical exception.  This is that the final rename, while atomic on an individual shard, is not coordinated among the shards.  That means that if you have a cluster with two shards, it is possible for the mapReduce() to complete at different times on different shards, which means that different commands could 'win' the final race.

For example, if you have an output collection that is sharded among shards A and B, and you run two mapReduce() commands on that collection at the same time, both going to the same output collection, it's possible that shard A will store the results of the first mapReduce() command, while shard B will store the results of the second mapReduce() command.

For this reason, I recommend that you not run multiple simultaneous mapReduce() commands outputting to the same collection in 'replace' mode on a sharded cluster.

B) If the output mode is set to 'merge', then the output of the current mapReduce() command "merge/overwrites" the output of any previous command.  Essentially, the results of the current command are 'upserted' into the existing collection.

This means that the results of the current mapReduce() command will overwrite the results of any previous mapReduce() command on a key-by-key basis.  If the latest command has a new result for an existing key, then the value for that key is overwritten with the results from the latest mapReduce() command; if the key does not exist in the collection then a new key/value pair is inserted.

There is only minimal concurrency control here.  Only one mapReduce() command will update a document at a time.  However, if two mapReduce() commands run at the same time, then there is no guarantee of which documents will be updated in which order.  The last command to update a particular key value wins. 

This behavior is the same for both sharded and un-sharded output collections.

This may or may not be what you want, depending on your use case.

C) If the output mode is set to 'reduce', then the output of the current mapReduce() command is further reduced with any existing documents in the output collection.  If the current mapReduce() job generates a document with a key that matches an existing key in the collection, those two documents are merged using the 'reduce()' function of the current mapReduce() job, and the (possibly finalized) output of that operation replaces the existing value in the output collection.  As with the other cases, if the current mapReduce() job produces a document with a new key, that result is added to the existing collection.

Again, there is only document-level concurrency control -- if multiple mapReduce() commands are running on a single collection, the output of those reduce commands will be interleaved.

This behavior is the same for both sharded and un-sharded output collections.



Let me know if you have further questions.

 -William


cac...@gmail.com

unread,
Aug 30, 2012, 8:28:32 PM8/30/12
to mongod...@googlegroups.com
Thanks William, you've been extremely helpful. Is there some place I should link this discussion (or a place where it's been documented that I failed miserably to find)? The timing of the 2.2 release is certainly convenient for me.

While I was hoping that by the time I actually sent this email I would report that everything I tried was fully working one more question has arisen. For incremental map reduce what happens during a server failure during the final reduce step? Will the map reduce have completed on some documents in the collection but not others? Will the work continue on another server in the shard so that the reduce will complete for all affected documents? If the entire shard fails is the appropriate thing just to restore that collection from a previous backup  and do an incremental reduce from there? If there are issues is there them, or a set of best practices to mitigate them?

I tried thinking of systems using versioning tags to mitigate the issues but did not come up with one.

Christian

William Z

unread,
Sep 5, 2012, 7:52:23 PM9/5/12
to mongod...@googlegroups.com

Hi Christian!

Thanks for your great questions.  Addressing them:


1) The map/reduce job can only run on the primary node of a replica set, since that is the only node that allows writes.  In the current implementation, if a primary node is running a map/reduce job and it steps down for any reason, then the entire map/reduce job is aborted at that point, and the command returns an error condition to the caller.

2) In a map/reduce job with both sharded input and sharded output in MongoDB version 2.0, the final reduce is run by the 'mongos' process.  If any input or output shard fails, then the job is aborted at that point.  If only a portion of the documents have been processed when the output mode is 'merge' or 'reduce', then the documents which have been processed will remain, while the remaining documents will be unprocessed.  No attempt is made to recover or retry.

3) In a map/reduce job with both sharded input and sharded output in MongoDB version 2.2, the final reduce is split among all of the shards in the cluster.  If a single shard fails (that is, if the primary fails over) in the middle of the final reduce phase where the output mode is 'merge' or 'reduce' it will have the following impact:

  - The documents which have already been processed by that shard will remain; documents which have not been processed by that shard will not be processed
  - No other shard will take over for the work originally scheduled by the failing shard
  - When the secondary takes over, it will not take over for the work originally scheduled by the failing node
  - All other shards will run their jobs to completion as much as they are able
  - Recall that each shard will query every other shard for the documents that are targeted to eventually live on the destination shard.  If other shards have not yet fetched some documents from the failing node, the other shards will not re-connect to the new primary node for the shard, and those documents will not be processed by the map/reduce job. 

4) In short, if the primary node of a shard fails-over during a map/reduce job which outputs to a collection, the output collection is left in an unknown state.  In the worst case, some subset of documents has been processed and some subset hasn't, and there's no good way to tell.

If you're doing a map/reduce job with 'replace' output mode, then the data on the individual shards will either complete fully, or else fail.  However, if one shard fails and the others succeed, the overall output collection will be in an inconsistent state.  Fortunately, the overall command will not return 'success' unless all operations fully succeed on all shards. 

6) This algorithm for map/reduce processing is new in MongoDB version 2.2.  Unfortunately, this version is so new that there are no best practices yet established.  I wish I had a better answer for you.

If you're doing a map/reduce job with 'merge', 'replace', or 'inline' output mode, you can simply re-run the job, and you'll get similar output to what you would have gotten originally.

7) This isn't documented anywhere that I could find: I determined this by experiment and by examining the source code.  I'll be filing a documentation ticket in Jira covering what I've discovered.


Let me know if you have further questions.

 -William


cac...@gmail.com

unread,
Sep 5, 2012, 8:24:28 PM9/5/12
to mongod...@googlegroups.com
So if the map reduce is running in 'reduce' mode the output collection, which is also a data source, is hosed if any one of the primary servers fails? ie if anyone of up to a 1000 servers fails. This would appear to make using incremental map reduce a very chancy proposition in a sharded environment.

Thinking about mitigations for this:
Is there anything resembling cheap copies of collections that could be used to ensure a more consistent state (ie copy on write for portions of the index)? Thus one could alternate between two copies of the output collection ensuring that one is clean, or would all of the data need to be copied each time?

Christian

--
You received this message because you are subscribed to the Google
Groups "mongodb-user" group.
To post to this group, send email to mongod...@googlegroups.com
To unsubscribe from this group, send email to
mongodb-user...@googlegroups.com
See also the IRC channel -- freenode.net#mongodb

cac...@gmail.com

unread,
Sep 5, 2012, 8:32:42 PM9/5/12
to mongod...@googlegroups.com
Am I correct in thinking that even coming up with a system using version tags for the output collection will not be of assistance since some documents may have made it from the failed server?

cac...@gmail.com

unread,
Sep 5, 2012, 9:18:22 PM9/5/12
to mongod...@googlegroups.com
(Sorry for the repeated messages, I wasn't fully thinking through the available options before sending).

How does this sound as a sort of tagging system to enable recovery of the old data without having to restore the entire collection from back up.

There are two collections 'INPUT' and 'OUTPUT'. As the name suggests 'INPUT' is the collection used as input for the map reduce, and 'OUTPUT' is the collection that contains the eventual aggregation from 'INPUT'. The map function emits a document as the following
{_id : {idsubfield1 : "stuff1"., idsubfield2 : "stuff2"},  value: { reduceTimeStamp: <timestamp>, A : <integer>, B: <integer>, C :<integer> }}
reduceTimeStamp contains a timestamp which is hard coded into the map function (ie the map function is generated by the external application) and serves as an identifier for the reduce.

The reduce function will also have this timestamp coded into it, and when it processes a value with a reduceTimeStamp different from that of the current run, that value shall be placed into value.old, and if one of the values has the current timestamp but a value.old, value.old shall be set as the new value.old .

If the map reduce succeeds value.old can be ignored. If the application receives a report of failure, it will kick of some operation to replace value with value.old for all those documents where value.reduceTimeStamp is the same as the failed map reduce. Since value.old is not emitted by map (or could perhaps be emitted as null), there should only ever be one document in a reduce that has the value set which is the document that was in 'OUTPUT' at the start of the map reduce operation (or in one descended from it via reduce operations).

I am a bit unsure as to how this failure operation should be implemented. It looks like it might be an update using multi and $rename, but I'd need to experiment to confirm if $rename can work on subdocuments.

Does this sound like a solution that should work in theory? (Obviously it'd be great to have some sort of safe incremental map reduce in Mongo itself).

Christian

William Zola

unread,
Sep 7, 2012, 5:30:36 PM9/7/12
to mongod...@googlegroups.com

Hi Christian!

To address your questions:

> This would appear to make using incremental map reduce a very chancy proposition in a sharded environment.

This is correct.


> Is there anything resembling cheap copies of collections that could be used to ensure a more consistent state (ie copy on write for portions of the index)?

Nothing like this exists for MongoDB.  You'd have to copy all the data in the entire collection.

It's not clear to me exactly what business problem you're trying to solve, so it's hard to recommend solutions.  As in most issues with MongoDB, there usually isn't one overall general correct answer: the correct answer depends on your data model and what you're trying to do.

That said, I've discussed your problem with some folks in the office, and we've come up with two possible solutions.

1) Use Hadoop to do the map/reduce, and store the data back in MongoDB.  See here for a presentation on the MongoDB/Hadoop connector:

 - http://www.10gen.com/presentations/webinar/mongodb-hadoop-taming-elephant-room

2) Maintain multiple values in the collection to be re-reduced.  All calculations are based on values in set A, and the output is put into values in set B.  Once you've got a successful run of the map/reduce job, values from set B are put into set A (in a separate process). 

If the map/reduce job fails, you re-run it.  Since calculations are based on the values in set A, you can re-run as many times as you like and will still get the same results (assuming the same input data).  If the second step (putting values from set B into set A) fails, you can re-run that as many times as you like and you will still get the same results.

As a side note: are you familiar with the 'finalize' phase of map/reduce?  You might be able to get similar results by using the finalize phase to copy over results or store old results in the document.


Let me know if you have further questions.

 -William



Christian Csar

unread,
Nov 16, 2012, 9:37:19 PM11/16/12
to mongod...@googlegroups.com

William, I wanted clarification on this point. You mention that if you shard on anything other than _id the algorithm will break, and as such it's really not recommended. Is the reason the algorithm breaks that if it is sharded on something other than _id, documents with the same _id may end up on different shards and never get reduced together? Does sharded output in 2.2 work correctly if the preexisting collection is sharded on a subkey of _id? The subkey of _id would be fixed for each document and known at the time of the map.

I was thinking about how to use the aggregation framework to replace some of my currently written map reduce reports that add up sometimes a few hundred thousand documents. If the sharding happens as I would like, then I would have a better shot at staying under the 16MB command size limit for the group by. (The collection the aggregation would be using as it's data source is the sharded output collection of a map reduce command).

Christian
 

William Zola

unread,
Nov 23, 2012, 3:44:22 PM11/23/12
to mongod...@googlegroups.com
Hi Chrstian!

There are several reasons that sharding the output collection of a map/reduce job by antyhing other than '_id' will break.  They all boil down to a deep assumption within the code that the output collection is sharded by '_id', and only by '_id'.

These assumptions include
    - the assumption that all documents that need to be reduced together can be found on the same shard
    - the assumption that the target shard can be found by examining the _id field
    - other assumptions littered throughout the code

If you try to shard the output collection by anything other than '_id', the map/reduce will break.  In the best case, you'll get an error message; in the worst case you'll get results which are silently incorrect.

In particular, if you try to shard by only a portion of the '_id' field, you'll get incorect results or errors.

If you want to control the locality of the output collection, you can do so by changing the order of the fields in the emit() statement.

For example, if your emit() looks like this:

  emit( { user: this.user, day: this.date }, {...} )

then 'user' will be the primary sort, and 'day' will be the secondary sort.  If you run aggregation commands that group by $user, they're likely to find most of the data for a single user on a single shard, as you'd need to have more than 64MB of data for a single user for that data to be forced onto multiple chunks.  On the other hand, if you run aggregation commands that group by $day, then you're likely to find the data split over multiple shards (possibly all of them).

If you do want to group by a particular field, simply put it first in the emit() order.  For example:

  emit( { day: this.date, user: this.user }, {...} )
 
This way, the date becomes the primary sort, and documents with the same day are more likely to live on a single shard.


Let me know if you have further questions.

 -William



William, I wanted clarification on this point. You mention that if you shard on anything other than _id the algorithm will break, and as such it's really not recommended. Is the reason the algorithm breaks that if it is sharded on something other than _id, documents with the same _id may end up on different shards and never get reduced together? Does sharded output in 2.2 work correctly if the preexisting collection is sharded on a subkey of _id? The subkey of _id would be fixed for each document and known at the time of the map.

I was thinking about how to use the aggregation framework to replace some of my currently written map reduce reports that add up sometimes a few hundred thousand documents. If the sharding happens as I would like, then I would have a better shot at staying under the 16MB command size limit for the group by. (The collection the aggregation would be using as it's data source is the sharded output collection of a map reduce command).

Christian

cac...@gmail.com

unread,
Nov 26, 2012, 3:41:25 PM11/26/12
to mongod...@googlegroups.com
Thanks that's useful to know. I'm placing my preferred shard key first (as you may see from the sample data in my question on aggregation) in _id, I'm glad that is available.

Christian


Christian Csar

unread,
Jan 30, 2013, 11:18:16 PM1/30/13
to mongod...@googlegroups.com
Something to keep in mind for anyone doing map reduce into sharded output collections. https://jira.mongodb.org/browse/SERVER-4271 The shard key, and hence _id, must be less than 512 bytes as opposed to the general limit of 1024 on _id and other indexed values. Naturally it'd be great if map reduce worked with output collections sharded on subfields of _id.

Christian
Reply all
Reply to author
Forward
0 new messages