java + aggregation framework

1,271 views
Skip to first unread message

jason

unread,
Mar 23, 2012, 5:17:45 PM3/23/12
to mongodb-user
Hi,

Using the aggregation framework I finally got something working from
mongo command line:

db.weather_hourly.aggregate({ $match : { location : "KSFO"}},
{ $group : { _id : "$location", avgTemp : { $avg :
"$temperature" } } })

However, how would I express this in my code as something that can be
run using

CommandResult result = DB.command(....)

I feel however I construct my BasicDBObject it's not working...

Thanks, Jason

Marc

unread,
Mar 23, 2012, 6:35:57 PM3/23/12
to mongodb-user
DB.command takes a basicDBObject as its input. This basicdbObject
should be in the form of:
{ "aggregate" : <collection name> , "pipeline" : [ <list of input
documents required for the aggregate command> ]}

Here is a similar question that was asked on Google Groups, with an
example of how to perform an aggregation command with the Java driver:
http://groups.google.com/group/mongodb-user/browse_thread/thread/ee7ac58efefd6569?fwc=1

Here is another example of how to run the aggregation command with the
Java driver. This uses the "author" collection from the example in
the "Invocation" section of the "Aggregation Framework" documentation:
http://www.mongodb.org/display/DOCS/Aggregation+Framework#AggregationFramework-Invocation

Mongo m = new Mongo("localhost", 27017);
DB db = m.getDB("test");
BasicDBObject cmdBody = new BasicDBObject("aggregate", "article");
ArrayList<BasicDBObject> pipeline = new ArrayList<BasicDBObject>();
BasicDBObject project = new BasicDBObject();
project.put("author", 1);
project.put("tags", 1);
BasicDBObject $group = new BasicDBObject("_id", new
BasicDBObject("tags", 1));
$group.put("authors", new BasicDBObject("$addToSet", "$author"));
pipeline.add(new BasicDBObject("$project", project));
pipeline.add(new BasicDBObject("$unwind", "$tags"));
pipeline.add(new BasicDBObject("$group", $group));
cmdBody.put("pipeline", pipeline);
System.out.println(cmdBody.toString());
System.out.println("result: " + db.command(cmdBody));

For your example, you should add the { $match : { location : "KSFO"}}
and { $group : { _id : "$location", avgTemp : { $avg :
"$temperature" } } }) documents to your "pipeline" array.

Hopefully the above examples will allow you to perform the desired
aggregation operation.

jason

unread,
Mar 26, 2012, 2:52:09 PM3/26/12
to mongodb-user
Thanks a lot for your help!

Now i see from the command line I get:

mongos> db.weather_hourly.aggregate({ $match : { location : "KSFO"}},
{ $group : { _id : "$location", avgTemp : { $avg :
"$temperature" } } })
{
"result" : [
{
"_id" : "KSFO",
"avgTemp" : 13.948871492707815
}
],
"ok" : 1
}


However, when I try to accomplish the same thing in Java

BasicDBObject cmdBody = new BasicDBObject("aggregate",
"location");
List<BasicDBObject> pipeline = new ArrayList<BasicDBObject>();
Map map = new HashMap();
map.put("_id", "$location");
map.put("avgTemp", new BasicDBObject("$avg", "$temperature"));
pipeline.add(new BasicDBObject("$group", map));
pipeline.add(new BasicDBObject("$match", new
BasicDBObject("location", "KSFO")));
cmdBody.put("pipeline", pipeline);
System.out.println(cmdBody.toString());
CommandResult result =
MongoDB.getWeatherDB().command(cmdBody);
System.out.println("result: " + result);

I get no results.. although query looks the same as from command line

[STDOUT] { "aggregate" : "location" , "pipeline" : [ { "$group" :
{ "_id" : "$location" , "avgTemp" : { "$avg" : "$temperature"}}} ,
{ "$match" : { "location" : "KSFO"}}]}
[STDOUT] result: { "result" : [ ] , "ok" : 1.0}

Any ideas are greatly appreciated.

Thanks, Jason

On Mar 23, 3:35 pm, Marc <m...@10gen.com> wrote:
> DB.command takes a basicDBObject as its input.  This basicdbObject
> should be in the form of:
> { "aggregate" : <collection name> , "pipeline" : [ <list of input
> documents required for the aggregate command> ]}
>
> Here is a similar question that was asked on Google Groups, with an
> example of how to perform an aggregation command with the Java driver:http://groups.google.com/group/mongodb-user/browse_thread/thread/ee7a...
>
> Here is another example of how to run the aggregation command with the
> Java driver.  This uses the "author" collection from the example in
> the "Invocation" section of the "Aggregation Framework" documentation:http://www.mongodb.org/display/DOCS/Aggregation+Framework#Aggregation...

Marc

unread,
Mar 27, 2012, 4:45:33 PM3/27/12
to mongodb-user
In the JS shell, the command is being run on the "weather_hourly"
collection, but in the Java code, the "aggregate" command is being
called on the "location" collection:

> db.weather_hourly.aggregate...
versus
{ "aggregate" : "location"...

Perhaps this is the source of the issue?

Also, perhaps try using an ArrayList instead of a HashMap?

Unfortunately, I do not have any sample documents from your
collection, so I cannot try this out for myself, but something like
the following should hopefully work:
Mongo m = new Mongo("localhost", 27017);
DB db = m.getDB("test");
BasicDBObject cmdBody = new BasicDBObject("aggregate",
"weather_hourly");
ArrayList<BasicDBObject> pipeline = new
ArrayList<BasicDBObject>();
BasicDBObject match = new BasicDBObject();
match.put("location", "KSFO");
BasicDBObject $group = new BasicDBObject("_id", "$location");
$group.put("avgTemp", new BasicDBObject("$avg", "$temperature"));
pipeline.add(new BasicDBObject("$match", match));

jason

unread,
Mar 27, 2012, 6:20:38 PM3/27/12
to mongodb-user
Hi Marc,

Thanks for the suggestions... I changed my "aggregate" from
"location" to "weather_hourly" but that didn't seem to do anything. I
find using an ArrayList didn't work since it adds [,] when I just want
only {,} and it caused a syntax error. Here's my latest code, still
not returning any results.

BasicDBObject cmdBody = new BasicDBObject("aggregate",
"weather_hourly");
List<BasicDBObject> pipeline = new ArrayList<BasicDBObject>();
Map<String, Object> map = new HashMap<String, Object>();
map.put("_id", "$location");
map.put("avgTemp", new BasicDBObject("$avg", "$temperature"));
pipeline.add(new BasicDBObject("$group", map));
pipeline.add(new BasicDBObject("$match", new BasicDBObject("location",
"KSFO")));
cmdBody.put("pipeline", pipeline);
System.out.println(cmdBody.toString());
CommandResult result = MongoDB.getWeatherDB().command(cmdBody);
System.out.println("result: " + result);

It would be fantastic if the quickstart app including examples of the
new aggregate functionality.

Thanks again, Jason

Marc

unread,
Mar 28, 2012, 11:03:29 AM3/28/12
to mongodb-user
Good Morning. Can you please include some sample documents from your
weather_hourly collection, so I may attempt to reproduce? I believe
this is the collection you are attempting to run the aggregation on,
correct? Please change any sensitive data that may be in the sample
documents.

I am fairly certain that an ArrayList of BasicDBObjects is the correct
data type. The "pipeline" key should be an array, denoted with square
brackets [], and not curly braces {}. If you can send me some example
documents, I will attempt to reproduce the syntax error that you
experienced using ArrayList.

Re-reading the Aggregation Documentation, I noticed that "The pipeline
cannot operate on collects with documents that contain one of the
following 'special fields': MinKey, MaxKey, EOO, Undefined, DBRef,
Code." Do your documents contain any of the above? If not, I am
confidant that we can get this to work!

Finally, to which 'quickstart app' are you referring? If it is
something produced by 10gen, I can speak with our Documentation
Engineer. We are currently working on an updated version of our
documentation.

Jason Novotny

unread,
Mar 28, 2012, 6:11:30 PM3/28/12
to mongod...@googlegroups.com, Marc

Hi Marc,

I really appreciate your help! I've created a simple test class you
can run with the mongo java driver which should upsert 500 "weather"
objects followed by an aggregate query to sum up the temperature average.
As far as docs and such I was thinking of
http://www.mongodb.org/display/DOCS/Java+Tutorial and it mentions :

"Using the Java driver is very simple. First, be sure to include the
driver jar mongo.jar in your classpath. The following code snippets come
from the examples/QuickTour.java example code found in the driver."


Thanks, Jason

MongoAggregationTest.java

何健

unread,
Mar 28, 2012, 9:27:26 PM3/28/12
to mongod...@googlegroups.com, Marc
Hi Jason,

After checked your code, I found the $match was behind. Then it will affect on the result of $group. That's why you get nothing.
In fact you just need move $match to the front of.

pipeline.add(new BasicDBObject("$match", new BasicDBObject("location",
"KSFO")));
map.put("_id", "$location");
map.put("avgTemp", new BasicDBObject("$avg", "$temperature"));
pipeline.add(new BasicDBObject("$group", map));

Thanks,
He Jian

在 2012年3月29日星期四UTC+8上午6时11分30秒,jason写道:

import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.MongoOptions;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
import org.bson.types.ObjectId;

import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author jnov...@machenergy.com
 * @version $Id$
 */
public class MongoAggregationTest {

    private static Mongo mongo;

    public static class WeatherHourly implements Serializable {

        private ObjectId mongoId;
        private Float temperature;
        private static String location = "KSFO";
        public static final String OBJECT_ID = "_id";
        public static final String TEMPERATURE = "temperature";
        public static final String LOCATION = "location";

        public WeatherHourly() {}

        public ObjectId getMongoId() {
            return mongoId;
        }

        public void setMongoId(ObjectId mongoId) {
            this.mongoId = mongoId;
        }

        public Float getTemperature() {
            return temperature;
        }

        public void setTemperature(Float temperature) {
            this.temperature = temperature;
        }

       
        public DBObject getDBObject() {
            DBObject dbo = new BasicDBObject();
            if (this.mongoId != null) {
                dbo.put(OBJECT_ID, this.mongoId);
            }
            dbo.put(TEMPERATURE, this.temperature);
            dbo.put(LOCATION, location);
            return dbo;
        }

    }

    public static void main(String[] args) {

        try {
            MongoOptions options = new MongoOptions();
            options.autoConnectRetry = true;
            mongo = new Mongo("localhost", options);
        } catch (UnknownHostException uhe) {
            throw new MongoException("Failed to create Mongo client", uhe);
        }

        DB db = mongo.getDB("weatherdb");
        DBCollection collection = getCollection(db);

        for (int i = 0; i < 500; i++) {
            WeatherHourly wh = new WeatherHourly();
            wh.setTemperature(Integer.valueOf(i).floatValue());
            saveWeatherHourlyDataAndReturnShardKeyQuery(collection, wh);
        }


        // do a query


        BasicDBObject cmdBody = new BasicDBObject("aggregate", "weather_hourly");
        List<BasicDBObject> pipeline = new ArrayList<BasicDBObject>();
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("_id", "$location");
        map.put("avgTemp", new BasicDBObject("$avg", "$temperature"));

        pipeline.add(new BasicDBObject("$group", map));
        pipeline.add(new BasicDBObject("$match", new BasicDBObject("location", "KSFO")));

        cmdBody.put("pipeline", pipeline);
        System.out.println(cmdBody.toString());

        CommandResult result = db.command(cmdBody);
        System.out.println("result: " + result);

    }

    private static DBCollection getCollection(DB db) {
        DBCollection collection = db.getCollection("weather_hourly");
        collection.ensureIndex(new BasicDBObject("location", 1));
        collection.ensureIndex(new BasicDBObject("temperature", 1));
        return collection;
    }

    private static DBObject saveWeatherHourlyDataAndReturnShardKeyQuery(DBCollection collection, WeatherHourly weatherHourly) {


        DBObject interval = weatherHourly.getDBObject();

        BasicDBObject query = new BasicDBObject();
        query.put(WeatherHourly.LOCATION, WeatherHourly.location);
        query.put(WeatherHourly.TEMPERATURE, weatherHourly.getTemperature());

        // upsert, wait for success
        WriteResult result = collection.update(query, interval, true, false, WriteConcern.SAFE);

        //MongoDB.handleWriteResult(false, result, "upsert", interval);
        return query;
    }
}


何健

unread,
Mar 28, 2012, 9:38:26 PM3/28/12
to mongod...@googlegroups.com, Marc
After group operation, there are only two fields "_id","avgTemp" left, then if $match {location: "KSFO"} added, then it will match nothing.
We can even add another $match for testing this. It will help to understand better. 

                pipeline.add(new BasicDBObject("$match", new BasicDBObject("location",
"KSFO")));
map.put("_id", "$location");
map.put("avgTemp", new BasicDBObject("$avg", "$temperature"));
pipeline.add(new BasicDBObject("$group", map));
pipeline.add(new BasicDBObject("$match", new BasicDBObject("avgTemp",
249.5)));

在 2012年3月29日星期四UTC+8上午9时27分26秒,何健写道:

Marc

unread,
Mar 29, 2012, 11:16:12 AM3/29/12
to mongodb-user
Hello, Jason. Thank you for including your code. I have run it, and
I have confirmed He Jian's observation that $match should precede
$group in the pipeline. If you reverse the order they are added, the
aggregation command should work correctly.

pipeline.add(new BasicDBObject("$match", new BasicDBObject("location",
"KSFO")));
pipeline.add(new BasicDBObject("$group", map));

The output should be as follows:
{ "aggregate" : "weather_hourly" , "pipeline" : [ { "$match" :
{ "location" : "KSFO"}} , { "$group" : { "_id" : "$location" ,
"avgTemp" : { "$avg" : "$temperature"}}}]}
result: { "serverUsed" : "localhost:27017" , "result" : [ { "_id" :
"KSFO" , "avgTemp" : 249.5}] , "ok" : 1.0}

Can you confirm that this works for you?

Thank you very much, He Jian, for your recommendation!

Jason Novotny

unread,
Mar 29, 2012, 2:00:37 PM3/29/12
to mongod...@googlegroups.com, Marc

Thanks Marc and He Jian for helping me out with this code example!

Since ordering is so important, I'd recommend updating the documentation
to reflect this so hopefully others don't have this problem.

Thanks again, Jason

Marc

unread,
Mar 29, 2012, 3:18:32 PM3/29/12
to mongodb-user
Happy to help! I am glad that everything is working as intended
now.

Your point about improving the documentation is well taken. We are
currently working on a new version of our documentation, which will
hopefully be made public in the near future. I have contacted our
Documentation Engineer and asked him to make the relevant update,
stressing the importance of order of operations in the "pipeline"
section of the aggregation framework documentation.

Sincerely,
Marc
Reply all
Reply to author
Forward
0 new messages