Local task: java.lang.OutOfMemoryError: GC overhead limit exceeded

3,612 views
Skip to first unread message

Jaka Jancar

unread,
Aug 31, 2012, 6:27:20 PM8/31/12
to spark...@googlegroups.com
Hi,

I have a problem processing ~1000 files from S3 totalling only 150 MB on a machine with 8GB RAM.

The RSS of java gets to 2GB, then I get:

12/08/31 17:47:53 WARN io.nio: java.net.SocketException: Transport endpoint is not connected
12/08/31 17:47:53 ERROR spark.LocalScheduler: Exception in task 0
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:2894)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:117)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:532)
at java.lang.StringBuilder.append(StringBuilder.java:206)
at java.io.ObjectInputStream$BlockDataInputStream.readUTFSpan(ObjectInputStream.java:3115)
at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023)
at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2836)
at java.io.ObjectInputStream.readString(ObjectInputStream.java:1616)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1337)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1684)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1340)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:18)
at spark.SimpleShuffleFetcher$$anonfun$fetch$5$$anonfun$apply$1.apply$mcVI$sp(SimpleShuffleFetcher.scala:30)
at spark.SimpleShuffleFetcher$$anonfun$fetch$5$$anonfun$apply$1.apply(SimpleShuffleFetcher.scala:21)
at spark.SimpleShuffleFetcher$$anonfun$fetch$5$$anonfun$apply$1.apply(SimpleShuffleFetcher.scala:21)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.SimpleShuffleFetcher$$anonfun$fetch$5.apply(SimpleShuffleFetcher.scala:21)
at spark.SimpleShuffleFetcher$$anonfun$fetch$5.apply(SimpleShuffleFetcher.scala:20)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.SimpleShuffleFetcher.fetch(SimpleShuffleFetcher.scala:20)
at spark.ShuffledRDD.compute(ShuffledRDD.scala:39)
Exception in thread "main" spark.SparkException: Task failed: ResultTask(0, 0), reason: ExceptionFailure(java.lang.OutOfMemoryError: GC overhead limit exceeded)
at spark.DAGScheduler$class.runJob(DAGScheduler.scala:312)
at spark.LocalScheduler.runJob(LocalScheduler.scala:11)
at spark.SparkContext.runJob(SparkContext.scala:284)
at spark.SparkContext.runJob(SparkContext.scala:295)
at spark.SparkContext.runJob(SparkContext.scala:306)
at spark.RDD.count(RDD.scala:215)
at com.celtra.analyzer.LogAnalyzer.analyze(LogAnalyzer.scala:81)
at com.celtra.analyzer.LogAnalyzer.analyzeSufficientS3Logs(LogAnalyzer.scala:55)
at com.celtra.analyzer.App$.main(App.scala:124)
at com.celtra.analyzer.App.main(App.scala)

I don't get how this is possible. It seems to me the 150 MB should easily fit into memory, and even if they didn't should they just spill to disk?

Matei Zaharia

unread,
Aug 31, 2012, 6:30:16 PM8/31/12
to spark...@googlegroups.com
What Spark version is this, and what kind of files are they -- are they compressed by any chance? Also, what did you set Spark's memory limit to (in conf/spark-env.sh)?

Matei

Jaka Jancar

unread,
Aug 31, 2012, 6:35:45 PM8/31/12
to spark...@googlegroups.com
You are correct, they are gzipped. Uncompressed it's 1.3 GB.

Spark version is 0.5.0 (0472cf8e0dd807466b7dbd1ff2a025c7bc927a82).

I don't have spark-env.sh... basically I assembled Spark right along with my job into a single jar (I'm only running locally so far). I'm not settings any env vars, so I'm guessing it's whatever the default is.

Thanks for the quick reply!

Jaka

Matei Zaharia

unread,
Aug 31, 2012, 6:43:22 PM8/31/12
to spark...@googlegroups.com
Ah, so I think there are a couple of issues. First, there is a bug with figuring out the total memory available to Java in that version -- if you don't specify it as the -Xms parameter to the JVM, in addition to -Xmx, it won't work. Try downloading the master branch and building that (all it has is bug fixes for 0.5). Second, the default memory of the JVM might overall be too small to deal with this data, especially if one of the files expands when uncompressed. You should pass a higher limit to java with -Xmx. Finally, this issue is happening in a reduce task, so there might be skew causing one reducer to receive a lot of data. You can use more reduce tasks by passing a number of tasks to groupByKey, reduceByKey, etc, -- it's an optional second argument to all those methods. For example, do data.reduceByKey(_ + _, 8) to use 8 tasks.

Matei

Jaka Jančar

unread,
Aug 31, 2012, 7:36:38 PM8/31/12
to spark...@googlegroups.com
The problem persists with master. While the job is running, it seems for periods the memory use is stable, then starts growing.

Btw, I have previously tried 0.5.0 with *both* -Xms and -Xmx (to be exact: -Xms4g -Xmx4g), and the problem was still there.

So it seems I have to change the code... A couple of questions:

 - I don't get how the reducer can receive too much data... does it not either stream the data or write it to disk?
 - Isn't 8 the default number of tasks? What number do you recommend?
 - If I have low-cardinality keys and there simply is one that has a lot of data, will this even help?
 - Also, should I also be using multiple threads, or will 1 be enough?

Thanks,
Jaka

Matei Zaharia

unread,
Aug 31, 2012, 7:41:48 PM8/31/12
to spark...@googlegroups.com
> - I don't get how the reducer can receive too much data... does it not either stream the data or write it to disk?

No, our reducers currently try to load their whole input in memory, so that's why they can run out. (This is something we're working on fixing, but the simplest way around it is to launch more reduce tasks.)

> - Isn't 8 the default number of tasks? What number do you recommend?

This is only true when running on Mesos; in local mode it makes one per CPU core, so it would be only 1 in your job. So just pass in an 8, or use System.setProperty("spark.default.parallelism", "8") before you create your SparkContext.

> - Also, should I also be using multiple threads, or will 1 be enough?

Doesn't matter much except for speed, although each thread will have a few hundred MB of working space too. Get it to work in one thread first with more reduce tasks -- just try 8, or even 20 or more; it should help.

Matei

Jaka Jančar

unread,
Aug 31, 2012, 9:21:40 PM8/31/12
to spark...@googlegroups.com
I have tried 8 tasks @ -Xmx2g and it failed.

Then I tried 64 tasks @ -Xmx6g, and it seems to be working, however I gave up after it took more than an hour (task overhead?) and doing re-computation of RDDs (to little memory? maybe the disk spilling cache would help).

I'm really surprised that:

 1. 1.5gb uncompressed data cannot be smoothly processed with 6gb of ram, and that
 2. with not much less data, the entire dataset could be processed in 10 minutes.

Anyways, I seem to be on a good track here. Thanks!

Matei Zaharia

unread,
Aug 31, 2012, 9:26:11 PM8/31/12
to spark...@googlegroups.com
Got it. Are you caching the data and doing multiple passes over it? In that case it might not be fitting due to the Java objects being bigger than the on-disk data. Java objects often have a lot of overhead -- for example, each String consumes at least 50 bytes of space, and something like an ArrayList of Integer will consume 30+ bytes per object even though an int is 4 bytes. This can multiply the size of your data if you have many small objects.

You can use SerializingCache to get around this: set spark.cache.class=spark.SerializingCache. Also, it's crucial for good performance to switch away from Java serialization. You can try setting spark.serializer=spark.KryoSerializer. You will need to register any custom classes with the Kryo library -- see slide 39 of http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf for an example. Switching serialization libraries will improve the reduce performance as well.

Matei

Jaka Jancar

unread,
Sep 3, 2012, 3:49:02 PM9/3/12
to spark...@googlegroups.com
I am caching the data, yes, because the computation is slow.

I will definitely try Kryo if it significantly improves reduce performance, however... Even if the SerializingCache fits 2x the data in memory, that does not get me where I want: I would like any size dataset to work.

I understand if you're running iterative algorithms it might be memory-or-nothing, but we're using Spark (over, say, Hadoop) because of it's elegant API and lean codebase/little complexity. Even if the output of every step was stored to disk, that would still be acceptable.

So I guess a better solution might be to 1) use DiskSpillingCache and 2) dynamically calculate the number of reduce steps based on the number of rows?

Btw, it seems "spark.default.parallelism" will only work for the Mesos scheduler, not local one.

Matei Zaharia

unread,
Sep 3, 2012, 5:21:39 PM9/3/12
to spark...@googlegroups.com
Yeah, DiskSpillingCache would do this. You can also try the "dev" branch of Spark, which lets you choose, for each RDD, both whether it will be stored deserialized in memory and whether it will spill to disk. You can choose this through a new RDD.persist() method which takes a StorageLevel object. The right StorageLevel will probably be DISK_AND_MEMORY for in-memory serialized storage that spills to disk. If you don't mind the overhead of small objects, you can also do DISK_AND_MEMORY_DESER. However small objects will also cause a lot more GC.

Matei

Jaka Jancar

unread,
Sep 26, 2012, 10:19:35 AM9/26/12
to spark...@googlegroups.com
Hi Matei,

The saga continues.

So in order to have enough memory available to process a large dataset, do you recommend that I dynamically calculate number of reduce tasks:

num_tasks = (size_of_data * some factor) / (num_slaves * ram_per_slave)

?

It seems like this is something I shouldn't have to be doing myself :/

Matei Zaharia

unread,
Sep 26, 2012, 2:57:54 PM9/26/12
to spark...@googlegroups.com
Yeah, this is currently what you have to do, although in general it's okay to have a high number of reduce tasks even if the dataset is small. I would create 2-3 reduce tasks per CPU core in your cluster. The latency of launching a new task in Spark is quite small because we reuse the same JVM and we send it the message to start a task directly.

We'd like to do this automatically in the future, but it will require some serious re-engineering of the shuffle pipeline, because in general you don't know how much map output there will be until you run the map tasks (e.g. they might filter a lot of the data).

Matei

Jaka Jancar

unread,
Sep 26, 2012, 4:26:01 PM9/26/12
to spark...@googlegroups.com
I think I'm finally making progress in understanding this:

It's about the size of the map output / reduce input, not cache size.

Which is of course what you have said immediately :) I just wasn't aware that these two are completely separate and that changing caching settings, e.g. disk spilling, makes no difference, since it never even gets to that.

--

I'm processing 20 GB uncompressed data on 4x m1.large (2 cpus, 8gb ram), so 32 GB total, and with 64 tasks (8/cpu) and standard Java serialization, it fails. So I'm trying now with 256 and also playing with Kryo. Like you said, it's a bit hard to estimate and quite annoying. Are there plans to stream the data from mappers to reducers?

--

Also, can you explain, if I set SPARK_MEM to 6gb:

 1. Is it true that -Xmx be set to 3gb for each of the 2 JVMs on each node?
 2. If so, why does RSS peak at only 2.1gb before I get "GC overhead limit exceeded"?
 3. Why is Java even running GC if it still has 0.9 gb available to use?

Thanks!
Jaka

Matei Zaharia

unread,
Sep 26, 2012, 5:38:41 PM9/26/12
to spark...@googlegroups.com
 1. Is it true that -Xmx be set to 3gb for each of the 2 JVMs on each node?

Actually Spark just runs one JVM on each node, and uses two threads in it. So that one JVM should have -Xmx set to 6 GB. By the way you might want to make sure that you copy conf/spark-env.sh to all the nodes. Otherwise they will have an older version that might be setting a lower value of SPARK_MEM.

 2. If so, why does RSS peak at only 2.1gb before I get "GC overhead limit exceeded"?
 3. Why is Java even running GC if it still has 0.9 gb available to use?

Reading these my guess is that the -Xmx wasn't actually set to 6g as mentioned above -- try looking at the command line of the process directly. This seems like a pretty confusing aspect of the current EC2 scripts; I'll try to fix it so that if you set SPARK_MEM on the master and not on the workers, it will propagate it through.

Matei

Jaka Jancar

unread,
Sep 27, 2012, 5:06:46 AM9/27/12
to spark...@googlegroups.com
That was indeed the case: I did not copy the config to the slaves :/

I was under the impression this was not needed since SPARK_MEM was in ENV_VARS_TO_SEND_TO_EXECUTORS.

Jaka Jancar

unread,
Sep 27, 2012, 1:28:21 PM9/27/12
to spark...@googlegroups.com
Matei,

I'm now getting "Buffer limit exceeded" errors. From Kryo.

I see I can increase the limit with spark.kryoserializer.buffer.mb, but what is getting serialized in a single call: a single element of the RDD, a partition, something else... ?

It would really surprise me if a single record was larger than 2 MB.

Reynold Xin

unread,
Sep 27, 2012, 1:56:25 PM9/27/12
to spark...@googlegroups.com
Which branch are you on? If master, which cache?

For 0.5, I was under the impression that serialization happens one tuple at a time, but I just checked the code for DiskSpillingCache and SerializingCache, they both appear to serialize the entire block at once:

e.g.



  override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
    val ser = SparkEnv.get.serializer.newInstance()
    bmc.put(datasetId, partition, ser.serialize(value))
  }

Jaka Jancar

unread,
Sep 27, 2012, 2:00:19 PM9/27/12
to spark...@googlegroups.com
I'm on latest master, using SerializingCache.

Surely it's not unexpected that a partition is larger than 2 MB ?

Matei Zaharia

unread,
Sep 27, 2012, 2:44:28 PM9/27/12
to spark...@googlegroups.com
Yeah, I think you do need to increase this value. This is fixed in the dev branch, where we don't serialize the whole array at once.

Matei
Reply all
Reply to author
Forward
0 new messages