This summary is the result of work done to migrate S3 catalog away from using Kryo for record serialization because of issues described here: https://github.com/geotrellis/geotrellis/issues/1138
We have chosen Apache Avro as the alternative. In parallel ongoing work has motivated a closer look at S3 catalog performance and benchmarking. The main question is: is it possible to use S3 as a datasource in web request-response cycle? Original tests where not encouraging with a representative query taking ~20s to complete.
Hopefully this document will be useful to people facing similar problems and use cases with GeoTrellis or Spark.
Gist: https://gist.github.com/echeipesh/b3880b5087ea184870d5
There has been some understandable confusion from the phrase “Kryo is out!” So it’s worth while to go over the different serializations going on in GeoTrellis and Spark:
RDD.map/reduce/flatMap
methods and any objects they may refer to.spark.serializer=org.apache.spark.serializer.KryoSerializer
in the SparkContext. GeoTrellis uses Kryo here for performance benefits.The main moral is that spark Kryo serialization can not be expected to be consistent across versions, and sometimes across instances of the application. This is not an issue when using it for over the wire and disc cache in a single cluster but make it dangerous to use for long term storage, across multiple application instances and versions.
For the purpose of benchmarks we count only the time required to fetch the data from S3 and iterate through it by calling rdd.count()
. Any further processing time that may be required are not counted. However previous experience shows that short-lived spark-jobs such as weighted overlays are dominated by I/O time.
object BenchmarkJob extends SparkJob {
def runJob(sc: SparkContext, config: Config) = {
val catalog: S3RasterCatalog = getCatalog(sc)
val layerName: String = config.getString("layer")
val zoom: Int = config.getInt("zoom")
val polygon: Polygon = config.getString("polygon").parseGeoJson[Polygon].reproject(LatLng, WebMercator)
val layer = LayerId(layerName, zoom)
val rdd = catalog.query[SpatialKey](layer).where(Intersects(polygon.envelope)).toRDD
Timer.timedTask(s"Count tiles for $layer"){
rdd.count()
}
}
def validate(sc: SparkContext, config: Config) = SparkJobValid
}
The old strategy of the catalog was to break down the bounding box into a sequence of index ranges, the ranges evenly and have the drivers list they keys from the start of each individual range before pulling the individual files.
The old behavior was primarily useful when two tiles would have conflicting indices, as happens often with spatiotemporal tiles. In those cases the files names would need a suffix to disambiguate them and this suffix could not be derived from the bounding box.
The new S3 catalog strategy is to write all tiles that share an SFC index to the same file and on read to perform refinement on the client side to discard tiles outside of the query range. With this restriction all possible indices covered by the query can be derived on the driver and the workers are able to start requesting files without intermediate LIST step.
It turns out the listing he bucket added significant amount of latency as the new strategy cut down query time from ~20s to ~2s for same query size.
Additionally ability to store multiple tiles per S3 file will be useful in implementation of meta-tiles. Meta-tiles are widely used to amortize IO overhead over larger chunks of data as typical 256x256 tiles, especially compressed, are too small from that consideration.
How big should the meta-tiles be? In this use case the layers are not used for display, only calculation, we can simulate effect of meta-tiles by increasing the base tile size. Here are some micro-benchmarks from a single 8 core host outside of EC2:
Query Polygon
{
"type": "Polygon",
"coordinates": [[
[-121.81640624999999,43.91372326852401],
[-121.81640624999999,44.49650533109345],
[-120.95947265624999,44.49650533109345],
[-120.95947265624999,43.91372326852401],
[-121.81640624999999,43.91372326852401]
]]
}
size | zoom | tiles | time (s) |
---|---|---|---|
256 | 13 | 560 | 4.6, 4.5, 4.6, 4.2 |
512 | 12 | 140 | 1.9, 1.1, 1.4, 1.3 |
768 | 11 | 40 | 1.0, 0.7, 0.7, 0.7 |
1024 | 11 | 40 | 1.0, 0.6, 1.1, 0.7 |
2048 | 10 | 15 | 0.9, 0.9, 0.8, 0.8 |
This set of benchmarks was run on single m3.2xlarge
, which also has 8 cores.
size | zoom | tiles | time (s) |
---|---|---|---|
256 | 13 | 560 | 2.5, 2.6, 2,6 |
512 | 12 | 140 | 1.3, 1.2, 1.0 |
768 | 11 | 40 | 1.1, 0.7, 0.6 |
1024 | 11 | 40 | 0.8, 0.7, 0.7 |
In attempt to bring down query times even further we consider local tile caching. Each client first checks the file system (SSD) before fetching it from S3. This makes good sense if the query regions are going to hot-spot, for instance around cities. These benchmarks represent best-case scenario because they are done on a single client. If this caching strategy was used on a cluster the cache hit rate would not be as good and it would take several queries for the same region before they are fully populated.
size | zoom | tiles | fresh (s) | cached (s) |
---|---|---|---|---|
256 | 13 | 560 | 4.5, 4.0 | 0.2, 0.1 |
512 | 12 | 140 | 1.6, 1.3 | 0.1, 0.1 |
768 | 11 | 40 | 0.7, 0.8 | 0.3, 0.3 |
1024 | 11 | 40 | 0.9, 0.6 | 0.1, 0.1 |
2048 | 10 | 15 | 0.7, 0.6 | 0.2, 0.2 |
Caching can be enabled by bringing in implicitly tagged instance of RasterRDDReader into local scope to be used by the S3RasterCatalog
like so:
implicit val reader = new CachingRasterRDDReader[SpatialKey](new File("/tmp/cache"))
val rdd = catalog.query[SpatialKey](layer).where(Intersects(polygon.envelope)).toRDD
It’s the responsibility of the caller to make sure that the directory exists and is writable.
There is no mechanism for trimming the cache, with the idea that it can be done by an external async process at some interval.
Note: We tested using Hadoop FileSystem interface in hopes of using HDFS as cache that could be shared by all the clients in the cluster. However it suffers ~30% performance penalty vs native java.io
. Once you imagine added time of lookup and possible fetch it is very likely that HDFS cache would have the similar performance as hitting S3 directly in this case.
It has been useful to perform these tests using spark-jobserver which brings following advantages:
typesafeconfig
reduces the the pain of creating a custom REST endpoint to parse the request body.Availability of docker container gives three escalating options for testing:
SparkJob
traitThis is a clear line of testing that delays the need to push to EC2, the most costly option, as far as possible and tightens the iteration loop on testing.
Ability to use same API for both async and sync jobs.
Currently all jobs are expected to complete synchronously, but it is not clear when this will change or will not be possible given the job definition.
I used SJS docker container from: https://github.com/azavea/docker-spark-jobserver
In order to run docker locally the container needs to be able to find AWS credentials which can be done with a volume mount:
docker run -p 8090:8090 --name spark-jobserver -v /Users/eugene/.aws/credentials:/root/.aws/credentials azavea/spark-jobserver
The AMI was created using the following CloudConfig
#cloud-config
coreos:
update:
reboot-strategy: off
units:
- name: spark-jobserver.service
command: start
content: |
[Unit]
Description=Spark Job Server
After=docker.service
[Service]
Restart=always
ExecStartPre=-/usr/bin/docker kill spark-jobserver
ExecStartPre=-/usr/bin/docker rm -f spark-jobserver
ExecStartPre=/usr/bin/docker pull quay.io/azavea/spark-jobserver:0.1.0
ExecStart=/usr/bin/docker run --name spark-jobserver -v /tmp/spark-jobserver.conf:/opt/spark-jobserver/spark-jobserver.conf:ro -p 8090:8090 quay.io/azavea/spark-jobserver:0.1.0
ExecStop=/usr/bin/docker stop -t 2 spark-jobserver
write_files:
- path: /tmp/spark-jobserver.conf
permissions: 0644
owner: root
content: |
spark.master = "local[4]"
spark.job-number-cpus = 4
spark.home = "/opt/spark"
spark.context-settings.passthrough.spark.serializer = "org.apache.spark.serializer.KryoSerializer"
spark.context-settings.passthrough.spark.kryo.registrator = "geotrellis.spark.io.hadoop.KryoRegistrator"
spark.jobserver.port = 8090
spark.jobserver.jar-store-rootdir = /opt/spark-jobserver/jars
spark.jobserver.jobdao = spark.jobserver.io.JobFileDAO
spark.JobFileDAO.jobdao.filedao.rootdir = /opt/spark-jobserver/filedao/data
spray.can.server.parsing.max-content-length = 250m
Note: If like me you’re using boot2docker you will need to mount the Users directory to VMs Users directory.
Once the container is up it is possible to initiate the jobs with curl:
curl --data-binary @summary/target/scala-2.10/datahub-summary-assembly-0.0.1.jar http://ec2-0-0-0-0.compute-1.amazonaws.com:8090/jars/benchmarks
curl --data-binary @examples/request-oregon.json "http://ec2-0-0-0-0.compute-1.amazonaws.com:8090/jobs?sync=true&appName=benchmarks&classPath=com.azavea.datahub.BenchmarkJob