using withPipeline for match and project operations in mongo-spark-connector

1,308 views
Skip to first unread message

sjmi...@gmail.com

unread,
Jul 20, 2016, 10:19:30 AM7/20/16
to mongodb-user
Hi,
So I am trying to load certain documents from mongo into spark for processing.
I filter the documents using $match operator and I want to transform the fields using $project operator.

However I am not able to get the syntax right for creating such an expression.
In both https://github.com/mongodb/mongo-spark and https://docs.mongodb.com/spark-connector/getting-started/ I only see simple example using just $match.

So my code is something like
    val rdd = MongoSpark.load(sc, readConfig)
    val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{\"$match\": { " +
          "\"x.t\" : { \"$gte\": 1448908200000, \"$lt\": 1448994600000 } " +
        "}, " +
        "\"$project\": { " +
          "\"_id\": 0, \"b\": \"$a.f\", \"v\": \"$a.m\", \"q\": \"$r.q\" " +
        "}}"
    )))

But when I run the assembly I get this exception

com.mongodb.MongoCommandException: Command failed with error 16435: 'exception: A pipeline stage specification object must contain exactly one field.' on server localhost:27017. The full response is { "errmsg" : "exception: A pipeline stage specification object must contain exactly one field.", "code" : 16435, "ok" : 0.0 }

Looks like some issue with the syntax of pipeline expression.
Could someone please guide me as how to specify the pipeline expression via spark for both match and project operations in single expression.

Thanks
Sachin





Wan Bachtiar

unread,
Jul 20, 2016, 7:58:38 PM7/20/16
to mongodb-user

Could someone please guide me as how to specify the pipeline expression via
spark for both match and project operations in single expression

Hi Sachin,

Similar to MongoDB Aggregation Pipeline in mongo shell, withPipeline accepts an array of pipeline stage documents via collection Seq. Applying this to your example:

val aggregatedRdd = rdd.withPipeline(Seq(
    Document.parse("{ '$match': {'x.t': { '$gte' : 1448908200000}, '$lt': 1448994600000 } }"), 
    Document.parse("{ '$project': {'_id':0, 'b':'$a.f', 'v':'$a.m', 'q':'$r.q' } }")
))

See also Aggregation Pipeline Behavior for some strategies to optimise aggregation operation.

Regards,

Wan.

Siva B

unread,
Jul 21, 2016, 2:25:16 AM7/21/16
to mongodb-user
How to avoid conflict data type or how can we cast it to string ?

sjmi...@gmail.com

unread,
Jul 22, 2016, 2:09:18 AM7/22/16
to mongodb-user
Thanks for the code.
I was doing this
    val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{\"$match\": { " +
          "\"x.t\" : { \"$gte\": 1448908200000, \"$lt\": 1448994600000 } " +
        "}}"
    ))).withPipeline(Seq(Document.parse("{\"$project\": { " +
          "\"_id\": 0, \"b\": \"$a.f\", \"v\": \"$a.m\", \"q\": \"$r.q\" " +
        "}}"
    )))

But your approach is better.

On the other Q on how to cast to string:
http://stackoverflow.com/questions/22698265/in-mongodb-project-clause-how-can-i-convert-date-from-milliseconds-to-isodate

I think you cannot do that using aggregation framework. Map reduce may be the way to do it.

However mongo-spark only provides pipeline operation.

Thanks
Sachin

Sampat Singh

unread,
Sep 4, 2016, 8:27:45 PM9/4/16
to mongodb-user
Hi 

Is there any way to write the code in java , as Seq is in scala ,i used java.util.Collections , my example code is :

 String aggregateQuery = "{ $match: {  $and: [{gatewaytime : { $gte : " + startTime
                + ", $lte : " + endTime + " }} , {mac : {$in:" + deviceMacs + "} } ]} }";
               
        JavaMongoRDD<Document> rdd = MongoSpark.load(getJSC());
        JavaMongoRDD<Document> aggregatedRdd = rdd
                .withPipeline(Collections.singletonList(Document.parse(aggregateQuery)));

this works fine now i need to provide $sort stage in pipe line, how to achieve this in java .
I tried :

String aggregateQuery = "{ $match: {  $and: [{gatewaytime : { $gte : " + startTime
                + ", $lte : " + endTime + " }} , {mac : {$in:" + deviceMacs + "} } ]} }";
               
        JavaMongoRDD<Document> rdd = MongoSpark.load(getJSC());
        JavaMongoRDD<Document> aggregatedRdd = rdd
                .withPipeline(Collections.singletonList(Document.parse(aggregateQuery)))
                .withPipeline(Collections.singletonList(Document.parse("{$sort: {gatewaytime : -1}")));  

And passing two document parse as :

  .withPipeline(Collections.singletonList(Collections.singletonList(Document.parse(aggregateQuery),
Document.parse("{$sort: {gatewaytime : -1}")));

Ross Lawley

unread,
Sep 5, 2016, 5:09:54 AM9/5/16
to mongod...@googlegroups.com

The JavaMongoRDD#withPipeline method takes a Java List so can be used in the same way as the Scala version:

JavaMongoRDD<Document> aggregatedRdd = rdd
    .withPipeline(Arrays.asList(Document.parse(aggregateQuery),

                                Document.parse("{$sort: {gatewaytime : -1}")));

Note: withPipeline method will overwrite any existing pipeline, so you must pass the complete pipeline to the method.


--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/703c0e4d-c10a-45de-8779-df19b99abe1e%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--


{ name     : "Ross Lawley",
  title    : "Senior Software Engineer",
  location : "London, UK",
  twitter  : ["@RossC0", "@MongoDB"],
  facebook :"MongoDB"}

Sampat Singh

unread,
Sep 29, 2016, 11:45:37 AM9/29/16
to mongodb-user

Hi ,
       Is there any way to pass allowDiskUse : true in withPipeline  in mongo-spark-connector

Thanks in advance.

Regards,
Sampat

Sampat Singh

unread,
Sep 29, 2016, 11:46:20 AM9/29/16
to mongodb-user
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.

To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.

Ross Lawley

unread,
Sep 29, 2016, 11:59:19 AM9/29/16
to mongod...@googlegroups.com
Hi,

Currently there isn't an option to do this. Currently, aggregations requiring disk usage aren't supported and the aggregation would have to be done in Spark.

This will be fixed as part of SPARK-81.  Could you confirm what version of Spark you are using? So I ensure the fix version is correct.

Ross

To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@googlegroups.com.

To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages