Questions about aggregation query with the spark mongodb connector

1,615 views
Skip to first unread message

Rich Bolen

unread,
Sep 22, 2016, 9:57:34 AM9/22/16
to mongodb-user
We have a pre-existing long running aggregation query that we use against mongo to police bad data.  We then process the results.  We'd like to use spark to run this query (and process the results).  Is it correct that the spark mongo aggregation support only provides filtering and not grouping?  Our agg pipeline does a match, group, match.  I ran a test and it appeared it only did the first match and ignored the $group clause.  

If it doesn't support grouping, can I kick off the agg query (from a spark job) using the mongodb driver API directly and then process the results as an RDD?  The results are written to a collection as part of the aggregation, so could be easily loaded into an RDD.  

I'm experienced with mongo but new to spark, so I'm feeling around for a good approach for running our data police queries via a spark job.

Thanks!

Rich

Wan Bachtiar

unread,
Sep 25, 2016, 10:01:26 PM9/25/16
to mongodb-user

Is it correct that the spark mongo aggregation support only provides filtering and not grouping? Our agg pipeline does a match, group, match. I ran a test and it appeared it only did the first match and ignored the $group clause.

Hi Rich,

Using the MongoDB Spark Connector, you are able to utilise the aggregation pipeline. For example, in Scala Spark:

val aggRdd = rdd.withPipeline(Seq(
                Document.parse("{$sort:{timestamp:1}}"), 
                Document.parse("{$group:{_id:{'myid':'$myid'}, record:{'$first':'$$ROOT'}}}"), 
                Document.parse("{$project:{'_id':0, 'doc':'$record.doc', 'timestamp':'$record.timestamp', 'myid':'$record.myid'}}")
                )   
            )

If you are having issues performing aggregation pipeline with the connector, please provide:

  • MongoDB version.
  • Code snippet containing your aggregation operation.
  • If any, error log that you are getting.
  • The output that you are getting, and what your expected outcome is.

If it doesn’t support grouping, can I kick off the agg query (from a spark job) using the mongodb driver API directly and then process the results as an RDD? The results are written to a collection as part of the aggregation, so could be easily loaded into an RDD.

Depending on your use case, you can also perform your operation in Spark job using Spark RDD operations. You can then write out the output into a MongoDB collection.

You may find the following code examples useful:

We have a pre-existing long running aggregation query that we use against mongo to police bad data.

Depending on your definition of ‘bad data’, and how you would want to police them. You may want to check out Document Validation which is a new feature in v3.2. It provides the capability to validate documents during updates and insertions.

Kind regards,

Wan.

Rich Bolen

unread,
Sep 28, 2016, 12:56:31 PM9/28/16
to mongodb-user
Hi Wan,
I'm using MongoDB 3.2.8 and Spark 1.6.2 running on JVM 1.8 on mac.  Simple spark cluster setup with one worker.  

Here's my aggregation query.  This is looking for duplicate postal addresses by street, state, and geojson point (longitude/latitude):
    rdd.withPipeline(
      Seq(
        Document.parse("""{ $match: {"status": {$ne:"Duplicate"}}}"""),
        Document.parse(
          """{ $group: {
            |    _id: { firstField: "$street", secondField: "$region", thirdField: "$geojson.coordinates" },
            |    uniqueIds: { $push: "$_id" },
            |    count: { $sum: 1 }
            |  }}""".stripMargin),
        Document.parse("""{ $match: { count: { $gt: 1 }}}"""),
        Document.parse("{ allowDiskUse:true }")))

The issue I'm having is the results it gives back are not the aggregation results but records directly from the collection.  I just want the aggregation results.  

As far as using built in RDD methods for the aggregation, I'm aggregating over a collection of about 100M documents so I'd rather not load all the documents into spark memory if I can avoid it.  I'm also thinking that the aggregation will run more quickly in mongo than spark.  For MongoRDD.withPipeline can I create an RDD from the source collection without loading all the docs in it and then use withPipeline to do the aggregation?  Will the agg results be passed to the partial function passed to RDD.map, flatMap, etc.?  Or will I need to write the agg results to a collection (using out:"someCollection") and then load them as another RDD?

Wan Bachtiar

unread,
Oct 4, 2016, 8:40:45 PM10/4/16
to mongodb-user

Document.parse(“{ allowDiskUse:true }”)))

Hi Rich,

Using the latest stable version (v1.1.0) of mongo-spark-connector, your aggregation pipeline example containing allowDiskUse should return a message similar to:

Caused by: com.mongodb.MongoCommandException: Command failed with error 16436: 'Unrecognized pipeline stage name: 'allowDiskUse'' on server mongodb:27017. The full response is { "ok" : 0.0, "errmsg" : "Unrecognized pipeline stage name: 'allowDiskUse'", "code" : 16436 }

There is currently an open ticket to support allowDiskUse:true with withPipeline - SPARK-81. Feel free to watch/upvote the ticket for updates.

The issue I’m having is the results it gives back are not the aggregation results but records directly from the collection. I just want the aggregation results.

I’ve just tested withPipeline in mongo-spark-connector v1.1.0, Apache Spark v1.6.2, MongoDB v3.2.10 , using similar aggregation operators as your example:

val rdd = MongoSpark.load(sc)
val aggRdd = rdd.withPipeline(Seq(
                    Document.parse("""{$match:{"myid":{$gte:1}}}"""), 
                    Document.parse("""{$group:{_id:{firstField:"$doc"}, count:{$sum:1}}}"""), 
                    Document.parse("""{$match:{count:{$gt:3}}}""")))
aggRdd.foreach(println)

The output is the aggregation result as expected.
If you have any questions on this issue, could you provide:

For MongoRDD.withPipeline can I create an RDD from the source collection without loading all the docs in it and then use withPipeline to do the aggregation?

When you perform load to RDD, it’s a lazy operation where it’s not being computed right away. It’s not loading all source collection until an action is triggered.
Utilising the 3 lines of example above, only the third line will trigger an action to perform an aggregation pipeline in MongoDB and pass only the resulting documents to Spark.

In addition to the resources mentioned previously, I would also recommend to enrol in free online course M233: Getting started with Spark and MongoDB to learn more about MongoDB and Spark.

Best regards,

Wan.

Nam NV

unread,
Nov 27, 2016, 8:55:02 AM11/27/16
to mongodb-user
I have same issue.
I'm using mongo-spark-connector ver 2.0.0, Spark ver 2.0, Mongo ver 3.2
The $group statement not return grouping data.
Source:
     val students = MongoSpark.load(sc)
     val aggRdd = students.withPipeline(Seq(
     Document.parse("""{$match:{xx_id:{$eq:"7d9c18eb3575705f2"}}}"""),
     Document.parse("""{$group:{_id:"$xx_id", Level: {$addToSet:"$Level"}}}""")))
    aggRdd.toDF().show()

Return data:
+--------+--------------------+
|   Level|                 _id|
+--------+--------------------+
|    [1]|7d9c18eb3575705f2|
|    [2]|7d9c18eb3575705f2
|[2, 1]|7d9c18eb3575705f2|
|      []|7d9c18eb3575705f2|
|    [4]|7d9c18eb3575705f2|
|    [6]|7d9c18eb3575705f2|
+--------+--------------------+

Ross Lawley

unread,
Nov 28, 2016, 4:57:43 AM11/28/16
to mongod...@googlegroups.com
Hi Nam,

This is the same issue, the partitioner doesn't use the aggregation pipeline and by default it partitions on `_id` of the original document.  Here multiple workers are working on different partitions containing the same xx_id, hence returning multiple results..  

Perhaps changing your partition key and partitioning on xx_id would be a better fit.

Ross

--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/4397579f-43ef-42ca-9629-e6385e79c7d0%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--


{ name     : "Ross Lawley",
  title    : "Senior Software Engineer",
  location : "London, UK",
  twitter  : ["@RossC0", "@MongoDB"],
  facebook :"MongoDB"}

Reply all
Reply to author
Forward
0 new messages