geospark spatial join performance

59 views
Skip to first unread message

Radan Šuba

unread,
Jan 14, 2019, 5:21:54 AM1/14/19
to GeoSpark Discussion Board
Dear all,

I have approx. 350 millions gps locations (points) over the Europe and I want to know countries they are located.
I used 40 simplified simple polygons representning the countries. I ran SpatialJoinQuery with not really favourable performance.

Main part of the code looks as following:

//CONVERT FROM DataFrame TO SpatialRDD
var polyRDD = new SpatialRDD[Geometry]
polyRDD.rawSpatialRDD = Adapter.toRdd(polyDf)

var pointRDD = new SpatialRDD[Geometry]
pointRDD.rawSpatialRDD = Adapter.toRdd(point).repartition(200)

//ADD PARTITIONING
polyRDD.analyze()
pointRDD.analyze()
pointRDD.spatialPartitioning(GridType.RTREE)

//ADD INDEX
val buildOnSpatialPartitionedRDD = true // Set to TRUE only if run join query
pointRDD.buildIndex(IndexType.RTREE, buildOnSpatialPartitionedRDD)
polyRDD.spatialPartitioning(pointRDD.getPartitioner)

//SPATIAL JOIN
val considerBoundaryIntersection = false // Only return gemeotries fully covered by each query window in queryWindowRDD
val usingIndex = true
val result = JoinQuery.SpatialJoinQueryFlat(pointRDD, polyRDD, usingIndex, considerBoundaryIntersection)

//convert to DF
var joinResultDf = Adapter.toDf(result, hc.sparkSession)
joinResultDf.createOrReplaceTempView("resultdf")

val out = hc.sql("""SELECT  _c3 AS id, 
                            _c4 AS timestamp,
                            _c8 AS latitude, 
                            _c9 AS longitude,
                            _c1 AS poly_id
                    FROM resultdf
                    """.stripMargin)
//STORE
out.write.saveAsTable("result")


I used GeoSpark core version 1.1.3 and run it on small hadoop cluster with Hive storage
It took several days (!!!). Any idea how I could speed it up?


Jia Yu

unread,
Jan 15, 2019, 2:51:13 AM1/15/19
to Radan Šuba, GeoSpark Discussion Board
Hi Radan,

Your code has several critical issues:

1. pointRDD.rawSpatialRDD = Adapter.toRdd(point).repartition(200), 200 is too small for 350 millions gps locations (points). Normally, for this data-scale, this number should be >1000
2. pointRDD.spatialPartitioning(GridType.RTREE) -> please use GridType.KDBTree instead.
3. IndexType.RTREE -> use IndexType.QuadTree instead

If you tune it well, based on my experience, if your cluster is not too weak, this join should be done in 10 minutes or a half hour.

Thanks,
Jia

------------------------------------

Jia Yu,

Ph.D. Student in Computer Science



--
You received this message because you are subscribed to the Google Groups "GeoSpark Discussion Board" group.
To unsubscribe from this group and stop receiving emails from it, send an email to geospark-discussio...@googlegroups.com.
To post to this group, send email to geospark-dis...@googlegroups.com.
Visit this group at https://groups.google.com/group/geospark-discussion-board.
To view this discussion on the web visit https://groups.google.com/d/msgid/geospark-discussion-board/c5bb8854-3279-4271-bca2-f6ab4dcad2d6%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jia Yu

unread,
Jan 15, 2019, 2:55:43 AM1/15/19
to Radan Šuba, GeoSpark Discussion Board
In addition, since you just have 40 polygons, if your memory is large enough to cache 350 million gps points, an alternative approach is to do it using 40 spatial range queries with cached spatial index or spatial rdd.

A single spatial range query on a fully cached spatial rdd only takes several milliseconds to finish. 

------------------------------------

Jia Yu,

Ph.D. Student in Computer Science


Radan Šuba

unread,
Jan 16, 2019, 9:49:18 AM1/16/19
to GeoSpark Discussion Board
What do you consider a " ...cluster is not too weak..."? So far no improvements on our sides. Do you have any recommendation concerning SparkSession configuration?

R.
Reply all
Reply to author
Forward
0 new messages