import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.Mongo; import com.mongodb.MongoException; import com.mongodb.MongoOptions; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; import org.bson.types.ObjectId; import java.io.Serializable; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author jnovotny@machenergy.com * @version $Id$ */ public class MongoAggregationTest { private static Mongo mongo; public static class WeatherHourly implements Serializable { private ObjectId mongoId; private Float temperature; private static String location = "KSFO"; public static final String OBJECT_ID = "_id"; public static final String TEMPERATURE = "temperature"; public static final String LOCATION = "location"; public WeatherHourly() {} public ObjectId getMongoId() { return mongoId; } public void setMongoId(ObjectId mongoId) { this.mongoId = mongoId; } public Float getTemperature() { return temperature; } public void setTemperature(Float temperature) { this.temperature = temperature; } public DBObject getDBObject() { DBObject dbo = new BasicDBObject(); if (this.mongoId != null) { dbo.put(OBJECT_ID, this.mongoId); } dbo.put(TEMPERATURE, this.temperature); dbo.put(LOCATION, location); return dbo; } } public static void main(String[] args) { try { MongoOptions options = new MongoOptions(); options.autoConnectRetry = true; mongo = new Mongo("localhost", options); } catch (UnknownHostException uhe) { throw new MongoException("Failed to create Mongo client", uhe); } DB db = mongo.getDB("weatherdb"); DBCollection collection = getCollection(db); for (int i = 0; i < 500; i++) { WeatherHourly wh = new WeatherHourly(); wh.setTemperature(Integer.valueOf(i).floatValue()); saveWeatherHourlyDataAndReturnShardKeyQuery(collection, wh); } // do a query BasicDBObject cmdBody = new BasicDBObject("aggregate", "weather_hourly"); List pipeline = new ArrayList(); Map map = new HashMap(); map.put("_id", "$location"); map.put("avgTemp", new BasicDBObject("$avg", "$temperature")); pipeline.add(new BasicDBObject("$group", map)); pipeline.add(new BasicDBObject("$match", new BasicDBObject("location", "KSFO"))); cmdBody.put("pipeline", pipeline); System.out.println(cmdBody.toString()); CommandResult result = db.command(cmdBody); System.out.println("result: " + result); } private static DBCollection getCollection(DB db) { DBCollection collection = db.getCollection("weather_hourly"); collection.ensureIndex(new BasicDBObject("location", 1)); collection.ensureIndex(new BasicDBObject("temperature", 1)); return collection; } private static DBObject saveWeatherHourlyDataAndReturnShardKeyQuery(DBCollection collection, WeatherHourly weatherHourly) { DBObject interval = weatherHourly.getDBObject(); BasicDBObject query = new BasicDBObject(); query.put(WeatherHourly.LOCATION, WeatherHourly.location); query.put(WeatherHourly.TEMPERATURE, weatherHourly.getTemperature()); // upsert, wait for success WriteResult result = collection.update(query, interval, true, false, WriteConcern.SAFE); //MongoDB.handleWriteResult(false, result, "upsert", interval); return query; } }