S3 Catalog Refactor Notes and Benchmarks

49 views
Skip to first unread message

Eugene Cheipesh

unread,
Aug 21, 2015, 2:42:32 PM8/21/15
to geotrellis-user

This summary is the result of work done to migrate S3 catalog away from using Kryo for record serialization because of issues described here: https://github.com/geotrellis/geotrellis/issues/1138

We have chosen Apache Avro as the alternative. In parallel ongoing work has motivated a closer look at S3 catalog performance and benchmarking. The main question is: is it possible to use S3 as a datasource in web request-response cycle? Original tests where not encouraging with a representative query taking ~20s to complete.

Hopefully this document will be useful to people facing similar problems and use cases with GeoTrellis or Spark. 

Gist: https://gist.github.com/echeipesh/b3880b5087ea184870d5

Serialization Story

There has been some understandable confusion from the phrase “Kryo is out!” So it’s worth while to go over the different serializations going on in GeoTrellis and Spark:

  1. Closures in Spark are always serialized with Java Serialization. These are the arguments to RDD.map/reduce/flatMap methods and any objects they may refer to.
  2. RDD record and broadcast variable serialization in Spark uses Java Serialization by default, but can be switched to Kryo by setting spark.serializer=org.apache.spark.serializer.KryoSerializer in the SparkContext. GeoTrellis uses Kryo here for performance benefits.
  3. RDD record when written to GeoTrellis catalogs (S3, Accumulo, HDFS). GeoTrellis used to use Kryo, as an expidient choice, and has now been switched over to Avro.

The main moral is that spark Kryo serialization can not be expected to be consistent across versions, and sometimes across instances of the application. This is not an issue when using it for over the wire and disc cache in a single cluster but make it dangerous to use for long term storage, across multiple application instances and versions.

Benchmarking Code

For the purpose of benchmarks we count only the time required to fetch the data from S3 and iterate through it by calling rdd.count(). Any further processing time that may be required are not counted. However previous experience shows that short-lived spark-jobs such as weighted overlays are dominated by I/O time.

object BenchmarkJob extends SparkJob {
  def runJob(sc: SparkContext, config: Config) = {
    val catalog: S3RasterCatalog = getCatalog(sc)
    val layerName: String = config.getString("layer")
    val zoom: Int = config.getInt("zoom")
    val polygon: Polygon = config.getString("polygon").parseGeoJson[Polygon].reproject(LatLng, WebMercator)
    val layer = LayerId(layerName, zoom)

    val rdd = catalog.query[SpatialKey](layer).where(Intersects(polygon.envelope)).toRDD

    Timer.timedTask(s"Count tiles for $layer"){
      rdd.count()
    }
  }

  def validate(sc: SparkContext, config: Config) = SparkJobValid
}

S3 Catalog Re-design

The old strategy of the catalog was to break down the bounding box into a sequence of index ranges, the ranges evenly and have the drivers list they keys from the start of each individual range before pulling the individual files.
The old behavior was primarily useful when two tiles would have conflicting indices, as happens often with spatiotemporal tiles. In those cases the files names would need a suffix to disambiguate them and this suffix could not be derived from the bounding box.

The new S3 catalog strategy is to write all tiles that share an SFC index to the same file and on read to perform refinement on the client side to discard tiles outside of the query range. With this restriction all possible indices covered by the query can be derived on the driver and the workers are able to start requesting files without intermediate LIST step.

It turns out the listing he bucket added significant amount of latency as the new strategy cut down query time from ~20s to ~2s for same query size.

Additionally ability to store multiple tiles per S3 file will be useful in implementation of meta-tiles. Meta-tiles are widely used to amortize IO overhead over larger chunks of data as typical 256x256 tiles, especially compressed, are too small from that consideration.

Optimal Tile Size

How big should the meta-tiles be? In this use case the layers are not used for display, only calculation, we can simulate effect of meta-tiles by increasing the base tile size. Here are some micro-benchmarks from a single 8 core host outside of EC2:

Query Polygon

{
    "type": "Polygon",
    "coordinates": [[
        [-121.81640624999999,43.91372326852401],
        [-121.81640624999999,44.49650533109345],
        [-120.95947265624999,44.49650533109345],
        [-120.95947265624999,43.91372326852401],
        [-121.81640624999999,43.91372326852401]
    ]]
}
size zoom tiles time (s)
256 13 560 4.6, 4.5, 4.6, 4.2
512 12 140 1.9, 1.1, 1.4, 1.3
768 11 40 1.0, 0.7, 0.7, 0.7
1024 11 40 1.0, 0.6, 1.1, 0.7
2048 10 15 0.9, 0.9, 0.8, 0.8

Benchmarks on EC2

This set of benchmarks was run on single m3.2xlarge, which also has 8 cores.

size zoom tiles time (s)
256 13 560 2.5, 2.6, 2,6
512 12 140 1.3, 1.2, 1.0
768 11 40 1.1, 0.7, 0.6
1024 11 40 0.8, 0.7, 0.7

Cached S3 Reader

In attempt to bring down query times even further we consider local tile caching. Each client first checks the file system (SSD) before fetching it from S3. This makes good sense if the query regions are going to hot-spot, for instance around cities. These benchmarks represent best-case scenario because they are done on a single client. If this caching strategy was used on a cluster the cache hit rate would not be as good and it would take several queries for the same region before they are fully populated.

size zoom tiles fresh (s) cached (s)
256 13 560 4.5, 4.0 0.2, 0.1
512 12 140 1.6, 1.3 0.1, 0.1
768 11 40 0.7, 0.8 0.3, 0.3
1024 11 40 0.9, 0.6 0.1, 0.1
2048 10 15 0.7, 0.6 0.2, 0.2

Caching can be enabled by bringing in implicitly tagged instance of RasterRDDReader into local scope to be used by the S3RasterCatalog like so:

implicit val reader = new CachingRasterRDDReader[SpatialKey](new File("/tmp/cache"))
val rdd = catalog.query[SpatialKey](layer).where(Intersects(polygon.envelope)).toRDD

It’s the responsibility of the caller to make sure that the directory exists and is writable.
There is no mechanism for trimming the cache, with the idea that it can be done by an external async process at some interval.

Note: We tested using Hadoop FileSystem interface in hopes of using HDFS as cache that could be shared by all the clients in the cluster. However it suffers ~30% performance penalty vs native java.io. Once you imagine added time of lookup and possible fetch it is very likely that HDFS cache would have the similar performance as hitting S3 directly in this case.

Deployment and Testing

It has been useful to perform these tests using spark-jobserver which brings following advantages:

  • Relying on typesafeconfig reduces the the pain of creating a custom REST endpoint to parse the request body.
  • Availability of docker container gives three escalating options for testing:

    • Sbt test context, using SparkJob trait
    • Local instance of a docker container
    • EC2 instance of a docker container

    This is a clear line of testing that delays the need to push to EC2, the most costly option, as far as possible and tightens the iteration loop on testing.

  • Ability to use same API for both async and sync jobs.
    Currently all jobs are expected to complete synchronously, but it is not clear when this will change or will not be possible given the job definition.

  • Sharing an interface for different types of clusters/contexts (local, spark cluster, mesos).
    Again it is not clear when it will be required to shift from using spark local context to a cluster. SJS provides an interface that hides that change.

Docker

Local

I used SJS docker container from: https://github.com/azavea/docker-spark-jobserver

In order to run docker locally the container needs to be able to find AWS credentials which can be done with a volume mount:

docker run -p 8090:8090 --name spark-jobserver -v /Users/eugene/.aws/credentials:/root/.aws/credentials azavea/spark-jobserver

EC2

The AMI was created using the following CloudConfig

#cloud-config

coreos:
  update:
    reboot-strategy: off
  units:
    - name: spark-jobserver.service
      command: start
      content: |
        [Unit]
        Description=Spark Job Server
        After=docker.service
        [Service]
        Restart=always
        ExecStartPre=-/usr/bin/docker kill spark-jobserver
        ExecStartPre=-/usr/bin/docker rm -f spark-jobserver
        ExecStartPre=/usr/bin/docker pull quay.io/azavea/spark-jobserver:0.1.0
        ExecStart=/usr/bin/docker run --name spark-jobserver -v /tmp/spark-jobserver.conf:/opt/spark-jobserver/spark-jobserver.conf:ro -p 8090:8090 quay.io/azavea/spark-jobserver:0.1.0
        ExecStop=/usr/bin/docker stop -t 2 spark-jobserver
write_files:
  - path: /tmp/spark-jobserver.conf
    permissions: 0644
    owner: root
    content: |
      spark.master = "local[4]"
      spark.job-number-cpus = 4
      spark.home = "/opt/spark"
      spark.context-settings.passthrough.spark.serializer = "org.apache.spark.serializer.KryoSerializer"
      spark.context-settings.passthrough.spark.kryo.registrator = "geotrellis.spark.io.hadoop.KryoRegistrator"
      spark.jobserver.port = 8090
      spark.jobserver.jar-store-rootdir = /opt/spark-jobserver/jars
      spark.jobserver.jobdao = spark.jobserver.io.JobFileDAO
      spark.JobFileDAO.jobdao.filedao.rootdir = /opt/spark-jobserver/filedao/data
      spray.can.server.parsing.max-content-length = 250m

Note: If like me you’re using boot2docker you will need to mount the Users directory to VMs Users directory.

Jobs API

Once the container is up it is possible to initiate the jobs with curl:

curl --data-binary @summary/target/scala-2.10/datahub-summary-assembly-0.0.1.jar http://ec2-0-0-0-0.compute-1.amazonaws.com:8090/jars/benchmarks
curl --data-binary @examples/request-oregon.json "http://ec2-0-0-0-0.compute-1.amazonaws.com:8090/jobs?sync=true&appName=benchmarks&classPath=com.azavea.datahub.BenchmarkJob

Reply all
Reply to author
Forward
0 new messages