val aggregatedRdd = rdd.withPipeline(Seq(
Document.parse("{ '$match': {'t': { '$gte' : 1448908200000, '$lt': 1448994600000 } } }"),
Document.parse("{ '$project': {'_id':0, 'b':'$a.f', 'v':'$a.m', 'q':'$r.q' } }")
))
2016-08-02T15:31:08.356+0530 I COMMAND [conn15] command db.$cmd command: aggregate { aggregate: "collection", pipeline: [ { $match: { _id: { $gte: ObjectId('55f81965994a42d827b04adc'), $lt: ObjectId('55f82c8b7acf7ea60f7e6cd2') } } }, { $match: { t: { $gte: 1448908200000, $lt: 1448994600000 } } }, { $project: { _id: 0, b: "$a.f", v: "$a.m", q: "$r.q" } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:4449 reslen:106 locks:{ Global: { acquireCount: { r: 8904 } }, MMAPV1Journal: { acquireCount: { r: 4452 } }, Database: { acquireCount: { r: 4452 } }, Collection: { acquireCount: { R: 4452 } } } 42703ms
2016-08-02T15:31:08.992+0530 I COMMAND [conn17] command db.$cmd command: aggregate { aggregate: "collection", pipeline: [ { $match: { _id: { $gte: ObjectId('55f841cbb84247002f591db2'), $lt: ObjectId('55f8d527ea0f545334273082') } } }, { $match: { t: { $gte: 1448908200000, $lt: 1448994600000 } } }, { $project: { _id: 0, b: "$a.f", v: "$a.m", q: "$r.q" } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:4099 reslen:106 locks:{ Global: { acquireCount: { r: 8204 } }, MMAPV1Journal: { acquireCount: { r: 4102 } }, Database: { acquireCount: { r: 4102 } }, Collection: { acquireCount: { R: 4102 } } } 38547msHi Sachin,
Good question! Let me clarify what is happening here. To get Spark to effectively process large amounts of data, it needs to be partitioned into multiple chunks. These partitions can efficiently be processed in parallel across the Spark cluster.
The MongoDB Spark Connector automatically partitions the data according to the partitioner config (see the partitioner section on the input configuration). The default partitioner is the MongoSamplePartitioner. It uses the average document size and random sampling of the collection to determine suitable partitions for the collection. The partition size is configurable and it defaults to approximately 64MB chunks of data. This causes the extra queries as each partition of the collection gets a min and max _id to query against.
It should be noted that none of the connector partitioners use the supplied pipeline query, they just chunk up the raw collection and the aggregation pipeline is applied afterwards. This may be inefficient depending on your needs and may result in empty partitions. However, given the flexibility required for the partitioners it fits all use cases. You can supply your own partitioner with custom logic should any of the defaults not meet your need. There is an example of that in the test suite with the HalfwayPartitioner and its used in this test case.
So to answer your question, the number 262 comes from the partitioner dividing up the collection. You can change partitioning strategy by configuring an alternative partitioner or even supply your own.
I hope that helps,
Ross
--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/6c23dccc-f0c2-4027-8415-77cb9e1d3b3e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.