Understanding how query is broken via mongo spark using withPipeline method

610 views
Skip to first unread message

sjmi...@gmail.com

unread,
Aug 2, 2016, 6:18:29 AM8/2/16
to mongodb-user
Hi,
I am trying to understand how any data pipeline is broken when using withPipeline of mongo spark connector.
The original pipeline to fetch data from mongo is:

    val aggregatedRdd = rdd.withPipeline(Seq(
     
Document.parse("{ '$match': {'t': { '$gte' : 1448908200000, '$lt': 1448994600000 } } }"),
     
Document.parse("{ '$project': {'_id':0, 'b':'$a.f', 'v':'$a.m', 'q':'$r.q' } }")
   
))



What is see is that this is broken into 262 tasks. I am using a 8 core CPU.
And I run the spark submit using spark-submit --class "TestMain" --master local[*]

So first question is from where we get this number 262. Is this decided by spark or mongo spark connector decides to query in 262 different tasks.

Further at my mongodb console I see something like this

2016-08-02T15:31:08.356+0530 I COMMAND  [conn15] command db.$cmd command: aggregate { aggregate: "collection", pipeline: [ { $match: { _id: { $gte: ObjectId('55f81965994a42d827b04adc'), $lt: ObjectId('55f82c8b7acf7ea60f7e6cd2') } } }, { $match: { t: { $gte: 1448908200000, $lt: 1448994600000 } } }, { $project: { _id: 0, b: "$a.f", v: "$a.m", q: "$r.q" } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:4449 reslen:106 locks:{ Global: { acquireCount: { r: 8904 } }, MMAPV1Journal: { acquireCount: { r: 4452 } }, Database: { acquireCount: { r: 4452 } }, Collection: { acquireCount: { R: 4452 } } } 42703ms
2016-08-02T15:31:08.992+0530 I COMMAND  [conn17] command db.$cmd command: aggregate { aggregate: "collection", pipeline: [ { $match: { _id: { $gte: ObjectId('55f841cbb84247002f591db2'), $lt: ObjectId('55f8d527ea0f545334273082') } } }, { $match: { t: { $gte: 1448908200000, $lt: 1448994600000 } } }, { $project: { _id: 0, b: "$a.f", v: "$a.m", q: "$r.q" } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:4099 reslen:106 locks:{ Global: { acquireCount: { r: 8204 } }, MMAPV1Journal: { acquireCount: { r: 4102 } }, Database: { acquireCount: { r: 4102 } }, Collection: { acquireCount: { R: 4102 } } } 38547ms

These lines appear for each of 262 Tasks. I have just shown 2 to compare.
Question here is why it is doing further match by _id field. The internal match and project is repeated for each of such 256 lines?

So it is giving me impression that it is querying the collection based on _id which it is breaking internally in a range for each of 256 tasks, and for each subset of result obtained it is further applying my pipeline. Please confirm this?

I don't think it should do this, because my pipeline is itself requesting a subset of data based on timestamp range.

So ideally I would expect single query to mongdb to get that data into spark, or if multiple are needed then they should have been broken into that timestamp range, with multiple sub ranges.

So please let me know how can I control as to what needs to be fetched from Mongo and how using the spark connector.

Thanks
Sachin

Ross Lawley

unread,
Aug 3, 2016, 5:45:54 AM8/3/16
to mongod...@googlegroups.com

Hi Sachin,

Good question!  Let me clarify what is happening here. To get Spark to effectively process large amounts of data, it needs to be partitioned into multiple chunks. These partitions can efficiently be processed in parallel across the Spark cluster.

The MongoDB Spark Connector automatically partitions the data according to the partitioner config (see the partitioner section on the input configuration).  The default partitioner is the MongoSamplePartitioner.  It uses the average document size and random sampling of the collection to determine suitable partitions for the collection. The partition size is configurable and it defaults to approximately 64MB chunks of data. This causes the extra queries as each partition of the collection gets a min and max _id to query against.

It should be noted that none of the connector partitioners use the supplied pipeline query, they just chunk up the raw collection and the aggregation pipeline is applied afterwards.  This may be inefficient depending on your needs and may result in empty partitions. However, given the flexibility required for the partitioners it fits all use cases.  You can supply your own partitioner with custom logic should any of the defaults not meet your need.  There is an example of that in the test suite with the HalfwayPartitioner and its used in this test case.

So to answer your question, the number 262 comes from the partitioner dividing up the collection. You can change partitioning strategy by configuring an alternative partitioner or even supply your own.

I hope that helps,

Ross


--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/6c23dccc-f0c2-4027-8415-77cb9e1d3b3e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--


{ name     : "Ross Lawley",
  title    : "Senior Software Engineer",
  location : "London, UK",
  twitter  : ["@RossC0", "@MongoDB"],
  facebook :"MongoDB"}

Reply all
Reply to author
Forward
0 new messages