I want to join two table with mongo-spark-connector, I was using withPipeline(Seq(Document.parse(“{$lookup:….. but isn’t working.
Hi Nam,
I’ve just tested the use of aggregation operator $lookup with mongo-spark-connector v1.1.0, MongoDB v3.2 and Apache Spark 1.6.2 as below successfully:
val orders = MongoSpark.load(sc, ReadConfig(Map("collection" -> "orders", "database"->"databaseName"), Some(ReadConfig(sc))))
val aggResult = orders.withPipeline(Seq(Document.parse("{$lookup:{from:'inventory', localField:'item', foreignField:'sku', as:'inventory_docs'}}") ))
aggResult.foreach(println)
The example collections and documents above is based on $lookup examples in the manual.
Note that $lookup is a new aggregation operator in MongoDB v3.2+.
If you are still having issue with performing $lookup through withPipeline, could you provide:
Regards,
Wan.
I’m using spark version 2.0.0, mongo-spark-connector version 2.0.0-rc0, MongoDB version 3.2
Hi Nam,
How do you execute your code ?
Using your collections example and environment above, below are examples to perform $lookup aggregation from Apache Spark with mongo-spark-connector.
If you’re running the code via spark-submit, then you could construct SparkContext as example below:
val conf = new SparkConf()
.setMaster("local")
.setAppName("lookupExample")
.set("spark.mongodb.input.uri", "mongodb://host:port/TEMP.Student")
val sc = new SparkContext(conf)
val students = MongoSpark.load(sc)
val aggResult = students.withPipeline(Seq(Document.parse("{$lookup:{from:'Address', localField:'std_id', foreignField:'id', as:'studentAddress'}}") ))
aggResult.foreach(println)
If you’re running the code via spark-shell, you could specify conf params as below example:
spark-shell --conf "spark.mongodb.input.uri=mongodb://host:port/TEMP.students" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc0
Execute your lookup aggregation within the spark-shell , as below:
val students = MongoSpark.load(sc)
val aggResult = students.withPipeline(Seq(Document.parse("{$lookup:{from:'Address', localField:'std_id', foreignField:'id', as:'studentAddress'}}") ))
aggResult.foreach(println)
See Initialising SparkContext for more information.
Note that the examples above assumes that your $lookup aggregation is valid for your collections. i.e. localField value connects to the foreignField value . See $lookup (aggregation) for information on the operator itself.
Regards,
Wan.