How to using withPipeline for lookup in mongo-spark-connector

714 views
Skip to first unread message

Nam NV

unread,
Nov 6, 2016, 5:15:24 PM11/6/16
to mongodb-user
Please help me,
I want to join two table with  mongo-spark-connector, I was using withPipeline(Seq(Document.parse("{$lookup:.....  but isn't working.
So, can I join table with mongo-spark-connector ?

Thanks
Nam 

Wan Bachtiar

unread,
Nov 7, 2016, 2:05:14 AM11/7/16
to mongodb-user

I want to join two table with mongo-spark-connector, I was using withPipeline(Seq(Document.parse(“{$lookup:….. but isn’t working.

Hi Nam,

I’ve just tested the use of aggregation operator $lookup with mongo-spark-connector v1.1.0, MongoDB v3.2 and Apache Spark 1.6.2 as below successfully:

val orders = MongoSpark.load(sc, ReadConfig(Map("collection" -> "orders", "database"->"databaseName"), Some(ReadConfig(sc))))

val aggResult = orders.withPipeline(Seq(Document.parse("{$lookup:{from:'inventory', localField:'item', foreignField:'sku', as:'inventory_docs'}}") ))

aggResult.foreach(println)

The example collections and documents above is based on $lookup examples in the manual.

Note that $lookup is a new aggregation operator in MongoDB v3.2+.

If you are still having issue with performing $lookup through withPipeline, could you provide:

  • Specific MongoDB version
  • Specific Apache Spark version
  • Specific mongo-spark-connector version
  • The error that you are getting
  • Full line example snippet code of your aggregation
  • Example documents from both collections, local and foreign.

Regards,

Wan.

Nam NV

unread,
Nov 23, 2016, 7:00:46 AM11/23/16
to mongodb-user
Thanks Wan, but I was trying this solution, but its't working.
I'm using spark version 2.0.0, mongo-spark-connector version 2.0.0-rc0, MongoDB version 3.2
my source code:
Solution 1:
val readConfig: ReadConfig = ReadConfig(Map("uri" -> "mongodb://test:1234...@FFF.XX.XXX.YY:27017/TEMP.Student"),
                                                                      Some(ReadConfig(Map("uri" -> "mongodb://test:1234...@FFF.XX.XXX.YY:27017/TEMP.Address"))))
val student = MongoSpark.load(sc, readConfig) val aggResult = student.withPipeline(Seq(Document.parse("{$lookup:{from:'Address', localField:'std_id', foreignField:'id', as:'studentAddress'}}") ))

OR

Solution 2:

val readConfig1: ReadConfig = ReadConfig(Map("uri" -> "mongodb://test:1234...@FFF.XX.XXX.YY:27017/TEMP.Student"))
val student = MongoSpark.load(sc, readConfig1)

val readConfig2: ReadConfig = ReadConfig(Map("uri" -> "mongodb://test:1234...@FFF.XX.XXX.YY:27017/TEMP.Address"))
val Address = MongoSpark.load(sc, readConfig2)
val aggResult = student.withPipeline(Seq(Document.parse("{$lookup:{from:'Address', localField:'std_id', foreignField:'id', as:'studentAddress'}}") ))

I get error "org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class scala.runtime.Nothing"

Wan Bachtiar

unread,
Nov 24, 2016, 7:05:04 PM11/24/16
to mongodb-user

I’m using spark version 2.0.0, mongo-spark-connector version 2.0.0-rc0, MongoDB version 3.2

Hi Nam,

How do you execute your code ?

Using your collections example and environment above, below are examples to perform $lookup aggregation from Apache Spark with mongo-spark-connector.

If you’re running the code via spark-submit, then you could construct SparkContext as example below:

val conf = new SparkConf()
                .setMaster("local")
                .setAppName("lookupExample")
                .set("spark.mongodb.input.uri", "mongodb://host:port/TEMP.Student")
val sc = new SparkContext(conf)
val students = MongoSpark.load(sc)
val aggResult = students.withPipeline(Seq(Document.parse("{$lookup:{from:'Address', localField:'std_id', foreignField:'id', as:'studentAddress'}}") ))
aggResult.foreach(println)

If you’re running the code via spark-shell, you could specify conf params as below example:

spark-shell --conf "spark.mongodb.input.uri=mongodb://host:port/TEMP.students" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc0

Execute your lookup aggregation within the spark-shell , as below:

val students = MongoSpark.load(sc)
val aggResult = students.withPipeline(Seq(Document.parse("{$lookup:{from:'Address', localField:'std_id', foreignField:'id', as:'studentAddress'}}") ))
aggResult.foreach(println)

See Initialising SparkContext for more information.

Note that the examples above assumes that your $lookup aggregation is valid for your collections. i.e. localField value connects to the foreignField value . See $lookup (aggregation) for information on the operator itself.

Regards,

Wan.

Nam NV

unread,
Nov 25, 2016, 2:39:41 AM11/25/16
to mongodb-user
I update  mongo-spark-connector to release version 2.0.0.
above solution it's work.
Wan, thanks for your support.
Reply all
Reply to author
Forward
0 new messages