Spark: Memory Consumption (OutOfMemoryError)

547 views
Skip to first unread message

Michael Mattig

unread,
Mar 12, 2015, 11:16:44 AM3/12/15
to geotrel...@googlegroups.com
Hi,

I'm puzzled that I'm running into memory problems with my following use case. I've ingested a raster (elevation of europe) of 113MB into HDFS as a single file. Then I'm running this simple program via the spark shell which I gave 1.5G of memory:

import geotrellis.spark._
import geotrellis.spark.op.local._

//get hadoop catalog
val hc = geotrellis.spark.io.hadoop.HadoopCatalog(sc, new org.apache.hadoop.fs.Path("hdfs://namenode.service.geotrellis-spark.internal:8020/europe") )

//load rasters from hdfs
val alt = hc.load[SpatialKey](LayerId("alt", 8) )

//"classify" rasters
val altClassified = alt.mapTiles(_.map(alt => if (alt < 2250) 1 else 0))


altClassified.collect()

I'm getting the following error:

2015-03-12 17:08:24,021 WARN  [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(71)) - Lost task 1.1 in stage 1.0 (TID 13, follower01): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
    at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
    at org.apache.spark.util.Utils$.writeByteBuffer(Utils.scala:160)
    at org.apache.spark.scheduler.DirectTaskResult$$anonfun$writeExternal$1.apply$mcV$sp(TaskResult.scala:48)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:993)
    at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:45)
    at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:219)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

Why can't my raster be processed? The memory should suffice for this raster. My final goal is to run an analysis on a global scale with multiple rasters, each about 1GB in size. How can I achieve this with reasonable memory?

Thanks in advance!

Best,
Michael

Rob Emanuele

unread,
Mar 12, 2015, 8:26:37 PM3/12/15
to geotrel...@googlegroups.com
Hi Michael,

The problem I see is that you are calling altClassified.collect()

The problem with calling "collect" on an RDD is that it tries to materialize every element of the RDD as a collection in the executor JVM. This is fine if you've done reduce step or filtering to end up with a small-enough-for-memory element set, but if you are taking an RDD of tiles and calling collect (and thereby asking the driver to hold all of the uncompressed tiles in JVM heap space) you'll end up running out of memory. In your case, with the sizes you mention, I wonder if the JVM doesn't have that much memory or the sizes you give are compressed, because 1.5G isn't that much (as long as you set up the JVM to have an increased heap size). But in any case, code that "collects" on large RDD's isn't going to be scalable.

Instead of collecting the tiles, which would probably not be so useful, you could save them back to HDFS:

hc.save(LayerId("classified", 8), altClassified)

And then you could do things like pull individual tiles out to paint onto a web map. Or, you could do things like derive statistics:

altClassified.histogram

These are types of "actions" you can perform that can actually ship the data back to the client. "Actions" are different from "transformations", in that they actually bring back data to the client. More info on that distinction here: http://spark.apache.org/docs/1.2.0/programming-guide.html#rdd-operations

Let me know if that helps, or additional questions.

Cheers!

Rob

--
You received this message because you are subscribed to the Google Groups "geotrellis-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to geotrellis-us...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Rob Emanuele, Tech Lead, GeoTrellis

Azavea |  340 N 12th St, Ste 402, Philadelphia, PA
rema...@azavea.com  | T 215.701.7692  | F 215.925.2663
Web azavea.com  |  Blog azavea.com/blogs  | Twitter @azavea

Michael Mattig

unread,
Mar 13, 2015, 5:50:07 AM3/13/15
to geotrel...@googlegroups.com
Hi Rob,

thanks for your quick answer. The raster is not compressed, so I think it should work with the available amount of ram. Could the problem be overhead from the Scala implementation (e.g. autoboxing of values)?

In general you are right that it doesn't make sense to collect the results in a single place. However I need to look at the whole output raster to verify the correctness of my computation. I could save the result first to HDFS. (This worked at least for my small example, for the complete one I get this error: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions I guess the raster dimension do not yet fit completely together when I use combinePairs.)

Now I need to get the resulting raster back from HDFS as GeoTiff or PNG. Is there an inverse procedure to your ingestHadoop method? I could load the raster tile by tile (how?) and then render it on local disk using the Tile's renderPng method.

Best regards,
Michael

Chris Brown

unread,
Mar 13, 2015, 8:12:41 AM3/13/15
to geotrel...@googlegroups.com
Is this running in local mode?

In addition to setting "spark.driver.memory" at the command line for spark-shell, you may need to reduce the default "spark.storage.memoryFraction" (in local mode) if your driver is running out of memory on collect since 60 percent of the allocated memory is going to be reserved for caching RDDs. I'm not sure that 1.5G is actually being used by the driver since by default "spark.driver.maxResultSize" is set to 1G -- I think spark would have aborted the collect before the JVM had a chance to throw an OOM error. Also, because the executor in local mode is run from within the driver (I believe), the amount of memory available to the driver might be even less.

I'm still trying to get a handle on all the various settings (and how they differ between local and cluster), but tweaking those things at the command line might help.
Chris Brown, GIS Software Developer

Azavea |  340 N 12th St, Ste 402, Philadelphia, PA

Reply all
Reply to author
Forward
0 new messages