Map/Reduce using java driver

941 views
Skip to first unread message

Martinus Martinus

unread,
Dec 16, 2011, 4:21:04 AM12/16/11
to mongod...@googlegroups.com
Hi,

How can I implement Mapreduce in mongodb using java driver for this kind of application :

1. First I have my collection which contain "humans", "time", and "date" keys. This collection called customer and has about more than one million documents inside.
{
       "time" : NumberLong(1325346084),
       "date" : ISODate("2011-12-09T02:54:44.893Z"),
       "humans" : 50
}

2. I want to query "humans" data that has "time" value below a certain time. And I want to add all of this "humans" data and put it inside a new collection.

Thanks.

Marc

unread,
Dec 16, 2011, 3:30:08 PM12/16/11
to mongodb-user
Do you simply want to copy all documents with a time value below a
certain number into a new collection?

You can do this with a simple function in Java:

Mongo connection = new Mongo("localhost", 27017);
DB db = connection.getDB("test");
DBCollection myColl = db.getCollection("test");
DBCollection myNewColl = db.getCollection("NEW");
myNewColl.drop();
BasicDBObject query1 = new BasicDBObject("time", new
BasicDBObject("$lt", 1324039518));
DBCursor cursor = myColl.find(query1);
System.out.println(cursor.count());
DBObject current = new BasicDBObject();
while(cursor.hasNext()) {
current = cursor.next();
myNewColl.save(current);
}

In the above code, each document matching your query is being pulled
from the server and inserted into a new collection ("NEW").

This will obviously create a lot of extra network traffic. It is
logical to want to try to do the operation on the server. This can be
done with a very simple Map Reduce: (In fact, because the data is not
being aggregated in any way, the Reduce function is not even used.)

myNewColl = db.getCollection("NEW_2");
myNewColl.drop();
String m = "function(){emit(this._id, {humans:this.humans,
time:this.time, date:this.date})}";
String r = "function(key, values){return values}";
String Output = "NEW_2";
BasicDBObject query1 = new BasicDBObject("time", new
BasicDBObject("$lt", 1324039518));
myColl.mapReduce(m, r, Output, query1);

Unfortunately, due to the nature of MapReduce, the output collection
must be in the form of {"_id":some_id, "value":some_value}. The above
code will create a collection with documents that look like:

{ "_id" : 51, "value" : { "humans" : 9, "time" : 1324039008, "date" :
ISODate("2011-12-16T12:36:48Z") } }

The Mongo Documentation on Map Reduce may be found here:
http://www.mongodb.org/display/DOCS/MapReduce
Additionally, here is a link to a MongoDB cookbook recipe for a Map
Reduce operation. The "Extras" section goes into more detail on how
Map Reduce works, and why the output has to be in this format.
http://cookbook.mongodb.org/patterns/finding_max_and_min

If your collection is not sharded, you can use the db.eval() command
to execute the JavaScript on the server:

myNewColl = db.getCollection("NEW_3");
myNewColl.drop();
db.eval("function(){db.test.find({time:{$lt:
1325346084}}).forEach(function(t){db.NEW_3.save(t);});}");

This will create an output collection of the documents that match your
query, and all of the operations will be done on the server, reducing
network traffic. The only downside is that db.eval() cannot be used
with a sharded collection.
The Mongo Documentation on db.eval() is here:
http://www.mongodb.org/display/DOCS/Server-side+Code+Execution#Server-sideCodeExecution-Using{{db.eval%28%29}}

Hopefully I understood your question correctly. Is this what you
wanted to do, or did you want to do some different operation on your
data?

Martinus Martinus

unread,
Dec 18, 2011, 9:32:27 PM12/18/11
to mongod...@googlegroups.com
Hi Marc,

Thanks for your solution. Actually, I want to sum all of the "humans" value that has its "time" value below the "time" value that I specified and put it into another collection :

{
      "year" : 2011,
      "population" : [
                           {
                              "time" : NumberLong(1325346084),
                              "total_humans" : 150
                           },
                           {
                              "time" : NumberLong(1325346090),
                              "total_humans" : 200
                           }
                          ]
}

Is that should be more faster to use map/reduce than using normal while loop?

Thanks.


--
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To post to this group, send email to mongod...@googlegroups.com.
To unsubscribe from this group, send email to mongodb-user...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/mongodb-user?hl=en.


Marc

unread,
Dec 19, 2011, 3:22:26 PM12/19/11
to mongodb-user
If I understand correctly, for each year, you want to calculate the
sum of all of the humans whose "time" value is below a certain
number. However, the sum will be only one value. In your example
document, you have two sub-documents in the "population" array so I am
concerned that I may not be understanding correctly.

Do you wish to calculate the total_humans with a "time" value at or
above the value that you give AND below the value that you give?

The following map function will emit the year, the time, and the
humans for each document in the collection.

The reduce function will sum all of the humans with a corresponding
"time" value before the value of time that is specified, and after the
value of time that is specified. In both cases the greatest value of
"time" will be returned with the value of "total_humans".

I am not sure this is exactly what you are looking for, but hopefully
it will be able to point you in the right direction, and at least get
you one step closer to achieving your goal.

This is the collection that I created to use as an example, stored in
the collection "humans"

> db.humans.save({ "_id" : 1, "humans" : 111, "time" : 1324052258, "date" : ISODate("2011-12-16T16:17:38Z") });
> db.humans.save({ "_id" : 2, "humans" : 112, "time" : 1324052238, "date" : ISODate("2011-12-16T16:17:18Z") });
> db.humans.save({ "_id" : 3, "humans" : 9, "time" : 1324052208, "date" : ISODate("2011-12-16T16:16:48Z") });
> db.humans.save({ "_id" : 4, "humans" : 22, "time" : 1324052168, "date" : ISODate("2011-12-16T16:16:08Z") });
> db.humans.save({ "_id" : 5, "humans" : 21, "time" : 1324052258, "date" : ISODate("2010-01-23T23:43:10Z") });
> db.humans.save({ "_id" : 6, "humans" : 32, "time" : 1324052238, "date" : ISODate("2010-01-23T23:26:10Z") });
> db.humans.save({ "_id" : 7, "humans" : 61, "time" : 1324052208, "date" : ISODate("2010-01-23T23:09:00Z") });
> db.humans.save({ "_id" : 8, "humans" : 36, "time" : 1324052168, "date" : ISODate("2010-01-23T22:51:40Z") });

I have chosen time = 1324052238 as the input value.

map = function () {
year = this.date.getFullYear()
emit(year, {"total_humans":this.humans, "time":this.time});
}

reduce = function (key, values) {
print("Reducing " + key);
population = [];
total_humans_before = 0;
max_time_before = 0;
total_humans_after = 0;
max_time_after = 0;
for(i in values){
if(values[i].time < 1324052238){
total_humans_before = total_humans_before +
values[i].total_humans;
if(values[i].time > max_time_before){
max_time_before = values[i].time;
};
}
else {
total_humans_after = total_humans_after +
values[i].total_humans;
if(values[i].time > max_time_after){
max_time_after = values[i].time;
};
};
};
population.push({"total_humans":total_humans_before,
"time":max_time_before});
population.push({"total_humans":total_humans_after,
"time":max_time_after});
return {"population":population};
}

> result = db.runCommand({"mapreduce" : "humans","map" : map,"reduce" : reduce,"out" : "humans_output"})

> db.humans_output.find().pretty()
{
"_id" : 2010,
"value" : {
"population" : [
{
"total_humans" : 97,
"time" : 1324052208
},
{
"total_humans" : 53,
"time" : 1324052258
}
]
}
}
{
"_id" : 2011,
"value" : {
"population" : [
{
"total_humans" : 31,
"time" : 1324052208
},
{
"total_humans" : 223,
"time" : 1324052258
}
]
}
}
>

Now on to your second question: Is it faster to perform the operation
on your server via Map/Reduce, or is it faster to perform the
operation from your client:

In a nutshell, there is a trade-off. Doing the operation on your
server via Map Reduce will reduce the amount of traffic on your
network. However, it will slow down your Mongo server. On the other
hand, if you perform the operation in your client, the processing load
will be taken off of your Mongo server, but there will be added
network traffic because each document will have to be retrieved from
your server, sent to the client, and then each document in the output
collection will have to be sent back to the server.

Additionally, if you would like your output collection to be of a form
other than {"_id" : some_id, "value" : some_value}, then you probably
want to do the operation in your client.

If you have any follow-up questions concerning Map Reduce, the Mongo
Community is here to help. Good luck!

Martinus Martinus

unread,
Dec 20, 2011, 10:09:21 PM12/20/11
to mongod...@googlegroups.com
Hi Marc,

Thanks for your complete explanation. What I'm doing is first to take all the data whose "time" value is below a certain number, this "time" value will be provided in my program, so every time it gave the "time" value, it will collect all the "humans" value below that certain "time" and then sum it up and put it back into the new collection. and the collection is grouped based on the "year" from this given "time".

Thanks again.

Marc

unread,
Dec 22, 2011, 3:42:15 PM12/22/11
to mongodb-user
Hello. I think I understand now what you are trying to do. I may not
be 100%, but hopefully this solution will get you close:

If I understand correctly, you have a collection that (when
simplified) looks like this:

> db.humans.find()
{ "_id" : 1, "humans" : 1, "time" : 1, "date" : 2011 }
{ "_id" : 2, "humans" : 1, "time" : 2, "date" : 2011 }
{ "_id" : 3, "humans" : 1, "time" : 3, "date" : 2011 }

You want to perform a MapReduce on the given documents, and return a
collection that looks like this:

> db.humans_output.find()
{ "_id" : 2011, "value" : { "population" : [ { "total_humans" : 3,
"time" : 3 } ] } }

with the value of total_humans equal to the sum of all humans up to
time:3

Now, three more documents are added:
> db.humans.find()
{ "_id" : 1, "humans" : 1, "time" : 1, "date" : 2011 }
{ "_id" : 2, "humans" : 1, "time" : 2, "date" : 2011 }
{ "_id" : 3, "humans" : 1, "time" : 3, "date" : 2011 }
{ "_id" : 4, "humans" : 1, "time" : 4, "date" : 2011 }
{ "_id" : 5, "humans" : 1, "time" : 5, "date" : 2011 }
{ "_id" : 6, "humans" : 1, "time" : 6, "date" : 2011 }

You want to run the same MapReduce function, and have it create a new
sub-document in the "population" array with all of the total_humans up
to time:6
> db.humans_output.find()
{
"_id" : 2011,
"value" : {
"population" : [
{
"total_humans" : 3,
"time" : 3
},
{
"total_humans" : 6,
"time" : 6
}
]
}
}

Am I on track so far?

For each document in the "humans" collection, the following map
function will emit documents in the form of:
{2011, {population:[{total_humans:1, time:3}]}}

map = function () {
year = this.date;
print("Emitting: " + year + ", {population:[{total_humans:" +
this.humans + ", time:" + this.time + "}]}");
emit(year, {"population":[{"total_humans":this.humans,
"time":this.time}]});
}

The following reduce function will sum up all of the total_humans
between time_lower and time_upper:

reduce = function (key, values) {
print("Reducing " + key);
total_humans = 0;
time_lower = 0;
time_upper = 3;
print(" Found " + values.length + " values");
population = [];
for(i in values){
for(p in values[i].population){
if(values[i].population[p].time == time_lower){
print("Adding previous value of " +
values[i].population[p].total_humans + "to total_humans");
total_humans = total_humans +
values[i].population[p].total_humans;
print("total_humans is now " + total_humans);
print("Copying population array...");
population = values[i].population;
}
else if((values[i].population[p].time > time_lower) &&
(values[i].population[p].time <= time_upper)){
print("Incrementing total_humans by " +
values[i].population[p].total_humans);
total_humans = total_humans +
values[i].population[p].total_humans;
print("New Value: " + total_humans);
}
else {
print("Skipping value because it is out of range.
values[i].population[p].time should never be less than time_lower if
the query is set properly");
}
}
}
print("Pushing total_humans: " + total_humans + ", time: " +
time_upper);
population.push({"total_humans":total_humans, "time":time_upper});
print("Returning...");
return {"population":population};
}

The query may be run like so (in the JS shell):

result = db.runCommand({"mapreduce" : "humans", "map" : map,
"reduce" : reduce, "query":{"$and":[{"time":{"$gt":-1}}, {"time":
{"$lte":3}}]}, "out" : {"reduce":"humans_output"}})

This will create the first document in the "humans_output" collection:
{ "_id" : 2011, "value" : { "population" : [ { "total_humans" : 3,
"time" : 3 } ] } }

When you re-run the reduce function, make sure to change the values of
time_lower to 3, and time_upper = to 6. The Reduce function will look
in the Values array for a document whose population array contains a
document with time == time_lower. If time_lower and time_upper are
not set correctly, it could affect your results.

Now the MapReduce function can be run a second time, with an updated
query to reflect the new documents that have been added to the
"humans" collection:
"query":{"$and":[{"time":{"$gt":3}}, {"time":{"$lte":6}}]}

> result = db.runCommand({"mapreduce" : "humans", "map" : map, "reduce" : reduce, "query":{"$and":[{"time":{"$gt":3}}, {"time":{"$lte":6}}]}, "out" : {"reduce":"humans_output"}})

> db.humans_output.find().pretty()
{
"_id" : 2011,
"value" : {
"population" : [
{
"total_humans" : 3,
"time" : 3
},
{
"total_humans" : 6,
"time" : 6
}
]
}
}

I hope the above will be helpful to you. I have left in a bunch of
extra print statements, so if you try running this, you will get a
better sense of what MapReduce is doing behind-the-scenes.

Martinus Martinus

unread,
Dec 25, 2011, 10:56:59 PM12/25/11
to mongod...@googlegroups.com
Hi Marc,

Thank you very much for your detailed answer, I'll tried it. I saw there is a hadoop-mongo plugin, so I'm thinking that this should be faster right?

Thanks and Merry Christmas.

Marc

unread,
Dec 28, 2011, 6:36:22 PM12/28/11
to mongodb-user
Merry Christmas to you, too. I am happy that I was able to help!

There are cases where Hadoop has been used to perform Map/Reduce
operations on data stored in a Mongo Collection, but I would be
apprehensive to make a sweeping generalization such as "it will be
faster". A colleague of mine who is more familiar with Hadoop has
expressed that is unlikely that this setup will be outright
"faster".

Additionally, my colleague cautions that, "Setting up and maintaining
a Hadoop cluster is far from an easy thing, and there's a lot of
additional knowledge that comes with writing the jobs."

If you are knowledgeable about Hadoop and already have a cluster up
and running, then you may find that the Mongo Hadoop plugin will be
ideal, but if this is not the case, then it is not advisable to set up
Hadoop just to do a Map/Reduce with Mongo data.

Marc

unread,
Dec 28, 2011, 6:49:02 PM12/28/11
to mongodb-user
Here is an additional note on Hadoop from another colleague of mine:

Regular hadoop executes code (map, reduce functions) in Java which
performs generally much faster than javascript, which our
implementation uses.
But there is the cost of actually grabbing all the data from the db,
running it through hadoop and writing it back.
Also the code in functions is usually very small, so most of time is
actually spent on translations.
Consequently which one will be faster depends on case, for large
computationally intensive jobs hadoop will be faster, otherwise
probably ours.
When we switch to v8 we'll get 3x perf boost.

Martinus Martinus

unread,
Jan 2, 2012, 2:05:12 AM1/2/12
to mongod...@googlegroups.com
Hi Marc,

Thanks for your valuable informations and it's Mongodb map/reduce itself can be run in cluster just like Mongo-Hadoop and if one node failure will not affect the map/reduce results?

Thanks and Happy New Year 2012.

Marc

unread,
Jan 3, 2012, 6:23:44 PM1/3/12
to mongodb-user
Happy New Year to you, too!

As per the Mongo Documentation on Map Reduce, "If the input collection
is sharded, MongoS will automatically dispatch the map/reduce job to
each of the shard, to be executed in parallel."
http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-ShardedEnvironments

If one of the shards is unavailable, then the Map Reduce will fail
instead of returning a possibly incomplete set of results. To avoid
this failure, it is a common practice to have each shard as a replica
set for redundancy.

The Sharding Introduction in the Mongo documentation contains a
diagram of a typical Mongo configuration.
http://www.mongodb.org/display/DOCS/Sharding+Introduction#ShardingIntroduction-ArchitecturalOverview

There is a more in-depth example in the "Simple Initial Sharding
Architecture" document.
http://www.mongodb.org/display/DOCS/Simple+Initial+Sharding+Architecture
Reply all
Reply to author
Forward
0 new messages