object ReproduceMain extends App with LazyLogging {println( "Does not convert to an aggregation pipeline: " )val conf = new SparkConf().setAppName("mongozips").setMaster("local[*]")val sc = new SparkContext(conf)val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "zips"))val zipDf = sc.loadFromMongoDB(readConfig).toDF()import zipDf.sqlContext.implicits._ // 1)val count = zipDf.filter($"pop" > 0).count()println(s"count = $count")sc.stop()println( "Converts to an aggregation pipeline: " )val conf2 = new SparkConf().setAppName("mongozips").setMaster("local[*]").set("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.zips")val sc2 = new SparkContext(conf2)val sqlContext2 = new SQLContext(sc2)val zipDf2 = sqlContext2.read.mongo()zipDf2.filter($"pop" > 0).show()}
zipDf.filter($"pop" > 0).select("state")// .show().explain(true)
== Physical Plan ==*Project [state#4]+- *Filter (isnotnull(pop#3) && (pop#3 > 0))+- *Scan MongoRelation(MongoRDD[7] at RDD at MongoRDD.scala:52,Some(StructType(StructField(_id,StringType,true), StructField(city,StringType,true), StructField(loc,ArrayType(DoubleType,true),true), StructField(pop,IntegerType,true), StructField(state,StringType,true)))) [state#4,pop#3] PushedFilters: [IsNotNull(pop), GreaterThan(pop,0)], ReadSchema: struct<state:string>