spark connector, filters predicate pushdown

595 views
Skip to first unread message

rbru...@ippon.fr

unread,
Jan 4, 2017, 3:44:11 PM1/4/17
to mongodb-user
Hi,

I am experimenting with the Spark-MongoDB connector.
More specifically, I wanted to understand which predicates are pushdown from Spark to MongoDB nodes.

The documentation says Spark's filters are translated to an aggregation framework (https://docs.mongodb.com/spark-connector/v1.1/spark-sql/#filters).
I have verified that by putting a breakpoint there.
The problem is, depending on how I configure how the collection is loaded to a DataFrame, the application stops or not on the breakpoint.

Here is the code I used to isolate my case:

object ReproduceMain extends App with LazyLogging {

  println( "Does not convert to an aggregation pipeline: " )
  val conf = new SparkConf()
    .setAppName("mongozips")
    .setMaster("local[*]")

  val sc = new SparkContext(conf)
  val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/", "database" -> "test", "collection" -> "zips"))
  val zipDf = sc.loadFromMongoDB(readConfig).toDF()
  import zipDf.sqlContext.implicits._ // 1)

  val count = zipDf
    .filter($"pop" > 0)
      .count()
  println(s"count = $count")
  sc.stop()

  println( "Converts to an aggregation pipeline: " )
  val conf2 = new SparkConf()
    .setAppName("mongozips")
    .setMaster("local[*]")
    .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.zips")

  val sc2 = new SparkContext(conf2)
  val sqlContext2 = new SQLContext(sc2)
  val zipDf2 = sqlContext2.read.mongo()
  zipDf2
    .filter($"pop" > 0)
    .show()
}

The first call does not stop on the breakpoint but the second does.

Am I missing something to load the data in the first case?
If not, I will report this as a bug.

Thanks

Ross Lawley

unread,
Feb 1, 2017, 5:13:15 AM2/1/17
to mongodb-user
Hi,

The conversion to a DataFrame has changed in the 2.x version of the Spark Connector and this is no longer the case.

In the Mongo Spark 1.x version conversion of a MongoRDD to a DataFrame used the `sqlContext.createDataFrame` method. A side effect was that while it does create an DataFrame it didn't have access to the Mongo specific code that converts filters.  In Mongo Spark 2.x this code path was refactored so that the toDF uses the `com.mongodb.spark.sql.DefaultSource` and any subsequent filters are applied.

I hope that helps,

Ross

rbru...@ippon.fr

unread,
Feb 2, 2017, 11:52:42 AM2/2/17
to mongodb-user
Hello,

Thank you,

Updating the connector to 2.x solved the problem!

For anyone reading this and wanting to deep dive into Spark pushdown optimizations, an easier way to verify how the filters are pushdown is to use Spark's explain plan.

  zipDf
    .filter($"pop" > 0)
    .select("state")
//    .show()
    .explain(true)


Should produce something similar to this:

== Physical Plan ==
*Project [state#4]
+- *Filter (isnotnull(pop#3) && (pop#3 > 0))
   +- *Scan MongoRelation(MongoRDD[7] at RDD at MongoRDD.scala:52,Some(StructType(StructField(_id,StringType,true), StructField(city,StringType,true), StructField(loc,ArrayType(DoubleType,true),true), StructField(pop,IntegerType,true), StructField(state,StringType,true)))) [state#4,pop#3] PushedFilters: [IsNotNull(pop), GreaterThan(pop,0)], ReadSchema: struct<state:string>



Raphael.
Reply all
Reply to author
Forward
0 new messages