Group mongo collection by attribute and count entries by timeframe group

108 views
Skip to first unread message

Simon Knoll

unread,
Mar 11, 2016, 7:01:01 AM3/11/16
to mongodb-user

I need to count entries by time group for every identifier. Entries which are all within a 2hours timeframe count as 1. Starting point of the timeframe is the first appearance of the identifier. When an entry appears wich is at least 2 hours after the starting point, I increment the counter and the date of the new entry is my new starting point.

For this the collection needs to be sorted by date. The created_at date of first entry of an identifier is the reference date and the counter is incremented by 1. All appearances of the same identifier within the 2 hours border of the reference date are not counted. When a created_at date(actual date) of an entry for the identifier is greater than the reference date, we increment the counter and the actual date becomes the new reference date

example:

{identifier:"223b07FF", created_at:"2016-02-05T10:00:00.000Z"}
{identifier:"223b07FF", created_at:"2016-02-05T11:00:00.000Z"}
{identifier:"223b07FF", created_at:"2016-02-05T11:50:00.000Z"}
{identifier:"223b07FF", created_at:"2016-02-05T13:00:00.000Z"}
{identifier:"223b0781", created_at:"2016-02-05T10:00:00.000Z"}
{identifier:"223b0781", created_at:"2016-02-05T13:00:00.000Z"}
{identifier:"223b0781", created_at:"2016-02-05T14:30:00.000Z"}
{identifier:"223b0781", created_at:"2016-02-07T23:13:25.000Z"}
{identifier:"223b0781", created_at:"2016-02-08T23:13:25.000Z"}
  • Count for "223b07FF" is 2
  • Count for "223b0781" is 4

I tried 3 different approaches:

Aggregate

With aggregate I cannot reference/compare to other dates in the matching group. I can only define intervals, which is not correct, as the first entry is the reference date and will be updated by dates being greater than reference date +2 hours. [SO_AG] shows an aggregate approach for a 15 minute interval, which is not referencing but just creating intervals and therefor not applicable to my use-case.

[SO_AG]

Map-reduce

With Map-reduce im having the feeling that parallelism messes with the order of the sorted collection and therefore resulting in a wrong result

Iterative

Iterating over the distinct identifiers, querying all relevant entries sorting, comparing and counting them is very slow and not feasible in a productive environment.

In the following I posted my Map-reduce approach:

map function:

emit(this.identifier, {created_at:this.created_at.getTime(),count:1});

reduce function:

function(key, values) {
  reduced_val={date:(values[0].date), count:values[0].count};
  values.forEach(function(kv){
    if(kv.date > reduced_val.date + 1000*60*60*4){
      reduced_val.date = kv.date;
      reduced_val.count += kv.count;
    }
  });
  return reduced_val;
}

finalize function:

function(key, reduced_value){
  if(reduced_value.count==undefined){
    return 1;
  }
  return reduced_value.count;
}

My question is how to solve this problem other then in the mentioned iterative way.

Asya Kamsky

unread,
Mar 19, 2016, 3:32:40 AM3/19/16
to mongodb-user
Just want to point out that your map-reduce is wrong - your map function does not emit the same shape value as your reduce function returns.

I also reject your assertion that "Iterating over the distinct identifiers, querying all relevant entries sorting, comparing and counting them is very slow and not feasible in a productive environment" because you can do this *in aggregation framework* and then it won't be slow.

What it comes down to is how many identifiers will there be when you're aggregating this?   Or more relevant to my point - how many total "created_at" dates for each (at most)?

Asya
 

--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: http://www.mongodb.org/about/support/.
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/c8ba229e-98df-4d6d-9a48-3dc24e1cad9c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Asya Kamsky
Lead Product Manager
MongoDB
Download MongoDB - mongodb.org/downloads
Free MongoDB Monitoring - cloud.mongodb.com
Free Online Education - university.mongodb.com
Get Involved - mongodb.org/community
We're Hiring! - https://www.mongodb.com/careers

William Byrne III

unread,
Apr 13, 2016, 1:41:20 AM4/13/16
to mongodb-user

Simon,

I have two solutions for your problem.  One is an iterative approach based on the entries your data being stored just as it shows in your example block, and the other stores the data differently so that MongoDB's [Aggregation Pipeline feature](https://docs.mongodb.org/manual/core/aggregation-pipeline/) can be used to get your desired result.

```
// load sample data
im = db.entries.insert([
  {identifier:"223b07FF", created_at:new Date("2016-02-05T10:00:00.000Z")},
  {identifier:"223b07FF", created_at:new Date("2016-02-05T11:00:01.000Z")},
  {identifier:"223b07FF", created_at:new Date("2016-02-05T13:00:03.000Z")},
  {identifier:"223b07FF", created_at:new Date("2016-02-05T11:50:02.000Z")},
  {identifier:"223b0781", created_at:new Date("2016-02-05T10:00:04.000Z")},
  {identifier:"223b0781", created_at:new Date("2016-02-05T14:30:06.000Z")},
  {identifier:"223b0781", created_at:new Date("2016-02-05T13:00:05.000Z")},
  {identifier:"223b0781", created_at:new Date("2016-02-07T23:13:27.000Z")},
  {identifier:"223b0781", created_at:new Date("2016-02-08T23:13:28.000Z")}
]) ;

// find all entries sorted by identifier and then created_at  
var ent_cursor = db.entries.find({},{"_id":0,"identifier":1, "created_at":1}) ;
ent_cursor.sort({"identifier":1, "created_at":1}) ;

  // initialise loop variables
  var cur_Identifier = "";
  var cur_Created_At = new Date();
  var bnd_Created_At = new Date();
  var count_id2hEnts = 0;

  while (ent_cursor.hasNext()) {
    thisDoc = ent_cursor.next() ;
    // if this identifier different from current identifier 
    if (cur_Identifier != thisDoc.identifier) { 
      // print if not first time through loop
      if (cur_Identifier != "") { 
        print("ID:", cur_Identifier, "  Count:", count_id2hEnts) ;
      }
      // update loop vars to new identifer and datetime, reset counter to 1
      cur_Identifier = thisDoc.identifier ;
      cur_Created_At = new Date(thisDoc.created_at) ;
      bnd_Created_At.setTime(cur_Created_At.getTime() + 1000*60*60*2) ; 
      count_id2hEnts = 1 ;
    }
    else {   // if same identifier and over 2 hours later, increment  
             // counter and set boundary datetime variables
      if (thisDoc.created_at > bnd_Created_At) { 
        cur_Created_At = new Date(thisDoc.created_at) ;
        bnd_Created_At.setTime(cur_Created_At.getTime() + 1000*60*60*2) ; 
        count_id2hEnts = count_id2hEnts + 1
      }
    }
  }

  // print last one
  print("ID:", cur_Identifier, "  Count:", count_id2hEnts) ;
``` 
I know you said that querying+sorting+counting entries would be too slow in a production environment, but if you have an index over {"identifier":1, "created\_at":1} then the MongoDB engine will almost certainly decide that the fastest way to return a sorted set of documents is to read them from the index and not have to do the actual work of sorting them.  If you also [project to just the indexed fields](https://docs.mongodb.org/manual/tutorial/project-fields-from-query-results/) it will run as a ["covered query"](https://docs.mongodb.org/manual/core/query-optimization/#covered-query) because as all the required fields are in the index there is no need to read the underlying documents.  If your entries documents have many more fields than just these two, then the index is also much smaller than the full collection, and thus much faster to scan.

  
  
    
My second solution stores the entries differently, with each document holding all  entries for an identifier within two hours from the first:

```
{"identifier":"223b0781", "created_at":["2016-02-05T10:00:04Z"]}
{"identifier":"223b0781", "created_at":["2016-02-05T13:00:05Z", "2016-02-05T14:30:06Z"]}
{"identifier":"223b0781", "created_at":["2016-02-07T23:13:27Z"]}
{"identifier":"223b0781", "created_at":["2016-02-08T23:13:28Z"]}
{"identifier":"223b07FF", "created_at":["2016-02-05T10:00:00Z", "2016-02-05T11:00:01Z", "2016-02-05T11:50:02Z"]}
{"identifier":"223b07FF", "created_at":["2016-02-05T13:00:03Z"]}
```

The data load process changes from a simple insert to "IF EXISTS (document for identifier with created_at within 2 hours) UPDATE it ELSE insert new document".  The benefit of storing the entries like this is that a very simple aggregation command will produce your report.  

```
  var incomingEntries = [
    {identifier:"223b07FF", created_at:new Date("2016-02-05T10:00:00.000Z")},
    {identifier:"223b07FF", created_at:new Date("2016-02-05T11:00:01.000Z")},
    {identifier:"223b07FF", created_at:new Date("2016-02-05T11:50:02.000Z")},
    {identifier:"223b07FF", created_at:new Date("2016-02-05T13:00:03.000Z")},
    {identifier:"223b0781", created_at:new Date("2016-02-05T10:00:04.000Z")},
    {identifier:"223b0781", created_at:new Date("2016-02-05T13:00:05.000Z")},
    {identifier:"223b0781", created_at:new Date("2016-02-05T14:30:06.000Z")},
    {identifier:"223b0781", created_at:new Date("2016-02-07T23:13:27.000Z")},
    {identifier:"223b0781", created_at:new Date("2016-02-08T23:13:28.000Z")}
  ] ;


  incomingEntries.forEach(function(entry) {
    // look for a document for that identifier with startDT <= created_at < endDT
    query = {"identifier" : entry.identifier,
             "startDT"    : {$lte: new Date(entry.created_at)},
             "endDT"      : {$gt:  new Date(entry.created_at)} } ;
    prjct = {"_id":1};
    foundOne = db.entries.findOne(query, prjct) ;

    // if no such document is found, create it
    if (foundOne == null)
      {
       var endDT = new Date(entry.created_at) ;
       endDT.setTime(endDT.getTime() + 1000*60*60*2) ;
       ins = {"identifier"   : entry.identifier,
              "startDT"      : new Date(entry.created_at),
              "endDT"        : new Date(endDT),
              "createds"     : [new Date(entry.created_at)],
              "countEntries" : 1} ;
       opt = {upsert:false} ;
       insRC = db.entries.insert(ins, opt)
      }

    // else document exists, so increase count and add this created_at to the array
    else
      {
       qid = foundOne;
       upd = {$inc :{"countEntries" : 1},
              $push:{"createds"     : new Date(entry.created_at)}} ;
       opt = {upsert:false} ;
       updRC = db.entries.updateOne(qid, upd, opt) ;
      }
  }) ;
```

Now a simple aggregation query produces your report:

```
   var agg_cursor = db.entries.aggregate([
     {$group: {"_id"     : {"identifier":"$identifier"},
               "entries" : {"$sum":1}} }]) ;
   while (agg_cursor.hasNext()) { printjson(agg_cursor.next())} ;

```


This second approach moves most of the work of producing your report to the data load process.  

You could extend the pre-aggregation another order of magnitude and store a single document for each identifier with an array of two hour block documents.  Each array element would include a nested second array of all the created_at entries within that period.  You could also have counter fields alongside the arrays at each level holding the the number of array elements, and the report would be a simple find of the identifier and counter field.  This makes generating the report faster at the cost of making the data load more complicated and slower.

Choosing the best approach has to be driven by all the things this data is used for.  If another report wants all entries sorted by created_at, that will be harder and slower to produced if they are stored in arrays nested in arrays.  One way to satisfy multiple conflicting requirements is to store the data in multiple forms - both the raw input stream and also in a pre-aggregated collection just for this report.  That would increase the space needed to store it all, but disk is cheap and time is money.

III
___________________
William Byrne III
Reply all
Reply to author
Forward
0 new messages