OutOfMemoryError on Mesos when collecting a RDD

1,503 views
Skip to first unread message

Qiang Cao

unread,
Dec 22, 2012, 7:25:12 PM12/22/12
to spark...@googlegroups.com
I encountered an OOM error when collecting a RDD on a mesos cluster. This RDD is of type RDD[(Int, Double)], and has ~40M items. It is distributed over 10 slaves, each of which has ~5GB memory available when I do collect(). The master also has sufficient memory. Below is the error log. I increased the spark default parallelism to 80, but it still failed at the same point. Does someone have any ideas to fix it? Thanks!

BTW, the code was tested against smaller datasets. It worked fine.

Exception in thread "Thread-6835" java.lang.OutOfMemoryError: Java heap space
    at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3437)
    at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3243)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1761)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1684)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1340)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
    at spark.scheduler.TaskResult.readExternal(TaskResult.scala:24)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1809)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
    at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23)
    at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45)
    at spark.scheduler.cluster.TaskSetManager.taskFinished(TaskSetManager.scala:259)
    at spark.scheduler.cluster.TaskSetManager.statusUpdate(TaskSetManager.scala:234)
    at spark.scheduler.cluster.ClusterScheduler.statusUpdate(ClusterScheduler.scala:195)
    at spark.scheduler.mesos.MesosSchedulerBackend.statusUpdate(MesosSchedulerBackend.scala:250)
Exception in thread "Thread-6837" java.lang.OutOfMemoryError: Java heap space
    at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3437)
    at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3243)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1761)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1684)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1340)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
    at spark.scheduler.TaskResult.readExternal(TaskResult.scala:24)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1809)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)
    at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23)
    at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45)
    at spark.scheduler.cluster.TaskSetManager.taskFinished(TaskSetManager.scala:259)
    at spark.scheduler.cluster.TaskSetManager.statusUpdate(TaskSetManager.scala:234)
    at spark.scheduler.cluster.ClusterScheduler.statusUpdate(ClusterScheduler.scala:195)
    at spark.scheduler.mesos.MesosSchedulerBackend.statusUpdate(MesosSchedulerBackend.scala:250)
Exception in thread "Thread-6838" java.lang.OutOfMemoryError: Java heap space
...............................................................................................


Patrick Wendell

unread,
Dec 22, 2012, 7:41:09 PM12/22/12
to spark...@googlegroups.com
Hi Qiang,

What collect() does it aggregates the content of the entire RDD in
memory onto one machine (the machine where collectC() was called).
Most likely your driver machine does not have enough memory to fit all
40M rows.

What is the reason you are calling collect() for?

- Patrick

Qiang Cao

unread,
Dec 22, 2012, 9:27:38 PM12/22/12
to spark...@googlegroups.com
Thanks Patrick! I collect the RDD in order to get statistics like median on the intermediate
results.

The master has a lot of memory (> 5GB) available when I call collect(). But not sure if that's
enough. Below is the spark INFO. From all the log I posted, can you tell which machine ran
out of memory? It is surprising that collect() takes so much memory.


12/12/22 23:41:52 INFO ClusterScheduler: Ignoring update from TID 2286 because its task set is gone
12/12/22 23:41:54 INFO ClusterScheduler: Ignoring update from TID 2284 because its task set is gone
12/12/22 23:41:54 INFO ClusterScheduler: Ignoring update from TID 2285 because its task set is gone
12/12/22 23:41:56 INFO ClusterScheduler: Ignoring update from TID 2285 because its task set is gone
12/12/22 23:41:58 INFO ClusterScheduler: Ignoring update from TID 2278 because its task set is gone
12/12/22 23:41:58 INFO ClusterScheduler: Ignoring update from TID 2286 because its task set is gone
12/12/22 23:42:00 INFO ClusterScheduler: Ignoring update from TID 2273 because its task set is gone
12/12/22 23:42:03 INFO ClusterScheduler: Ignoring update from TID 2283 because its task set is gone
12/12/22 23:42:05 INFO ClusterScheduler: Ignoring update from TID 2286 because its task set is gone
12/12/22 23:42:05 INFO MesosSchedulerBackend: driver.run() returned with code DRIVER_ABORTED

Mark Hamstra

unread,
Dec 22, 2012, 11:02:11 PM12/22/12
to spark...@googlegroups.com
You don't need to do that.  On RDD[Double] there is a stats() action available that returns a spark.util.StatsCounter object, from which you can easily get mean, variance, standard deviation, sum, etc.  There are also individual actions such as stdev() if you are only interested in one statistical metric.

Matei Zaharia

unread,
Dec 22, 2012, 11:10:23 PM12/22/12
to spark...@googlegroups.com
One problem might be that even if the total memory is small enough to fit on the master, the Mesos scheduler doesn't handle large task results well. You can either try to have more tasks (leading to a smaller result from each one), or set spark.mesos.coarse=true, which causes Spark to launch only long-lived daemon tasks on Mesos (one task per node) and do its own scheduling of tasks within those, and its own communication of results back to the master.

It would also help to do a jmap on the process before it runs out of memory to see what objects are taking that much space; for example, maybe the doubles are represented as java.lang.Double instead of primitives or something. (Run jmap -histo <processID>).

Matei

Mark Hamstra

unread,
Dec 23, 2012, 12:15:50 AM12/23/12
to spark...@googlegroups.com
Just to follow up on myself in case I wasn't clear...

You don't say exactly what your RDD[(Int, Double)] represents, but I'm going to assume that these are key-value pairs and that you want basic stats grouped by key.  That's not hard and requires little more memory on the driver node than is required to hold the unique keys and a (key, StatsCounter) tuple per key:

val r: RDD[(Int, Double)] = <whatever you are doing to produce your rdd>.cache()
val keys = r.map(_._1).distinct().collect
val statsByKey = (for (key <- keys) 
  yield (key, r.filter(_._1 == key).map(_._2).stats)).toMap
val stdevOfFirstKeyGroup = statsByKey(keys(0)).stdev  // similarly for variance, mean, count, etc.  

Qiang Cao

unread,
Dec 23, 2012, 1:24:34 AM12/23/12
to spark...@googlegroups.com
Thanks Mark! The analysis I'm going to perform on RDD[(Int, Double)] is a bit diverse, of course including variance and mean, and something more than that. Since there are only ~40M rows in this RDD, I prefer to pull the entire RDD to the master so that I can save network I/O during the analysis. I still believe collecting a RDD can be done if we have sufficient memory on the master. I'll try what Matei suggested first. Thanks.

Qiang Cao

unread,
Dec 25, 2012, 11:27:28 PM12/25/12
to spark...@googlegroups.com
Hi Matei,

I'm trying to fix this by moving to larger-memory machines and setting spark.mesos.coarse. The master has 65GB memory. Now the OOM error disappears. However, a NullPointerException shows up when I do collect(). With this cluster, again the code works fine on smaller datasets. Any suggestions? Thanks!

12/12/26 03:57:25 INFO CacheTrackerActor: Memory cache lost on ip-10-143-139-141.ec2.internal
12/12/26 03:57:25 INFO CacheTracker: CacheTracker successfully removed entries on ip-10-143-139-141.ec2.internal
12/12/26 03:57:25 INFO TaskSetManager: Serialized task 4.0:1 as 3386 bytes in 1 ms
12/12/26 03:57:25 INFO CacheTrackerActor: Asked for current cache locations
12/12/26 03:57:25 INFO TaskSetManager: Starting task 4.0:0 as TID 46 on slave 201212260206-1334480650-5050-2495-0: ip-10-142-133-46.ec2.internal (preferred)
12/12/26 03:57:25 INFO TaskSetManager: Serialized task 4.0:0 as 3386 bytes in 1 ms
12/12/26 03:57:25 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to ip-10-142-133-46.ec2.internal
12/12/26 03:57:25 INFO MapOutputTracker: Size of output statuses for shuffle 0 is 165 bytes
12/12/26 03:57:25 INFO TaskSetManager: Lost TID 45 (task 4.0:1)
12/12/26 03:57:25 INFO TaskSetManager: Loss was due to java.lang.NullPointerException
    at spark.MapOutputTracker$$anonfun$getServerStatuses$5.apply(MapOutputTracker.scala:160)
    at spark.MapOutputTracker$$anonfun$getServerStatuses$5.apply(MapOutputTracker.scala:159)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:38)
    at spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:159)
    at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:20)
    at spark.rdd.RepartitionShuffledRDD.compute(ShuffledRDD.scala:58)
    at spark.RDD.iterator(RDD.scala:161)
    at spark.rdd.FilteredRDD.compute(FilteredRDD.scala:11)
    at spark.RDD.iterator(RDD.scala:161)
    at spark.rdd.MappedRDD.compute(MappedRDD.scala:15)
    at spark.RDD.iterator(RDD.scala:161)
    at spark.scheduler.ResultTask.run(ResultTask.scala:18)
    at spark.executor.Executor$TaskRunner.run(Executor.scala:76)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:679)
12/12/26 03:57:25 INFO TaskSetManager: Starting task 4.0:1 as TID 47 on slave 201212260206-1334480650-5050-2495-0: ip-10-142-133-46.ec2.internal (preferred)
12/12/26 03:57:25 INFO TaskSetManager: Serialized task 4.0:1 as 3386 bytes in 1 ms

Qiang Cao

unread,
Dec 26, 2012, 5:42:31 PM12/26/12
to spark...@googlegroups.com
Hi,

I encountered this issue on Mesos EC2 clusters. I found that I always get the NullPointerException whenever I do collect() on a 40M-item RDD. Below is a simple piece of code I used for testing. It builds a RDD using a hdfs file and then collects the items in that RDD. Even with this simple code, I get NullPointerException when try to process a 40M-item RDD.

                  var rdd = sc.textFile("hdfs://...").map(...)  // rdd is a 40M-item RDD[Int]
                  rdd.collect()

On the other hand, the collect() works fine if I fed with smaller RDDs like a 10K-item RDD. So I suspect this may be due to some default configuration on the my mesos EC2 cluster. I tried to increase the Java heap size by adding SPARK_JAVA_OPTS+=" -D-Xmx=6154m" into /root/spark/conf/spark-env.sh. Unfortunately, It still doesn't work. I wonder if someone could share ideas to fix it? Thanks!

-Qiang

12/12/26 22:38:24 INFO CacheTrackerActor: Memory cache lost on ip-10-111-66-182.ec2.internal
12/12/26 22:38:24 INFO CacheTracker: CacheTracker successfully removed entries on ip-10-111-66-182.ec2.internal
12/12/26 22:38:24 INFO TaskSetManager: Serialized task 3.0:0 as 2986 bytes in 2 ms
12/12/26 22:38:24 INFO CacheTrackerActor: Asked for current cache locations
12/12/26 22:38:24 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to ip-10-110-185-109.ec2.internal
12/12/26 22:38:24 INFO MapOutputTracker: Size of output statuses for shuffle 0 is 170 bytes
12/12/26 22:38:24 INFO TaskSetManager: Lost TID 44 (task 3.0:0)
12/12/26 22:38:24 INFO TaskSetManager: Loss was due to java.lang.NullPointerException

    at spark.MapOutputTracker$$anonfun$getServerStatuses$5.apply(MapOutputTracker.scala:160)
    at spark.MapOutputTracker$$anonfun$getServerStatuses$5.apply(MapOutputTracker.scala:159)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:38)
    at spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:159)
    at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:20)
    at spark.rdd.RepartitionShuffledRDD.compute(ShuffledRDD.scala:58)
    at spark.RDD.iterator(RDD.scala:161)
    at spark.rdd.MappedRDD.compute(MappedRDD.scala:15)
    at spark.RDD.iterator(RDD.scala:161)
    at spark.scheduler.ResultTask.run(ResultTask.scala:18)
    at spark.executor.Executor$TaskRunner.run(Executor.scala:76)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:679)
12/12/26 22:38:24 INFO TaskSetManager: Starting task 3.0:0 as TID 45 on slave 201212261918-3704209418-5050-2322-0: ip-10-110-185-109.ec2.internal (preferred)

Matei Zaharia

unread,
Dec 26, 2012, 5:50:22 PM12/26/12
to spark...@googlegroups.com
Hi Qiang,

Is this with "spark.mesos.coarse" set to true? (And did you set it before creating the SparkContext object?)

From your log, it looks like some other error first, and this is the result of some state not being cleaned up correctly (which is a problem we had). See the first two lines:

12/12/26 03:57:25 INFO CacheTrackerActor: Memory cache lost on ip-10-143-139-141.ec2.internal
12/12/26 03:57:25 INFO CacheTracker: CacheTracker successfully removed entries on ip-10-143-139-141.ec2.internal

It sounds like host ip-10-143-139-141.ec2.internal failed. Do you see any error messages earlier in the log? What is the first one? If it's just this "lost a node" message, then SSH into that node and look in /mnt/mesos-work/.../<your job ID> to find the stdout and stderr log files from your job. Those will contain whatever error happened.

Matei

Qiang Cao

unread,
Dec 26, 2012, 6:37:50 PM12/26/12
to spark...@googlegroups.com
Thanks for you quick response, Matei! spark.mesos.coarse was set to true before creating sc. I paste more lines before the exception happens in the log. The test.scala contains only a few lines of code that actually does things as I described in the last post.

12/12/26 23:17:59 INFO DAGScheduler: Completed ResultTask(1, 0)
12/12/26 23:17:59 INFO SparkContext: Job finished: count at test.scala:1128, took 87.189440487 s
12/12/26 23:17:59 INFO SparkContext: Starting job: collect at test.scala:1133
12/12/26 23:17:59 INFO DAGScheduler: Registering RDD 7 (map at test.scala:1133)
12/12/26 23:17:59 INFO CacheTracker: Registering RDD ID 7 with cache
12/12/26 23:17:59 INFO CacheTrackerActor: Registering RDD 7 with 2 partitions
12/12/26 23:17:59 INFO DAGScheduler: Registering parent RDD 7 (map at test.scala:1133)
12/12/26 23:17:59 INFO DAGScheduler: Registering parent RDD 6 (partitionBy at test.scala:1119)
12/12/26 23:17:59 INFO CacheTrackerActor: Asked for current cache locations
12/12/26 23:17:59 INFO DAGScheduler: Got job 2 (collect at test.scala:1133) with 2 output partitions
12/12/26 23:17:59 INFO DAGScheduler: Final stage: Stage 3 (map at test.scala:1133)
12/12/26 23:17:59 INFO DAGScheduler: Parents of final stage: List(Stage 2)
12/12/26 23:17:59 INFO DAGScheduler: Missing parents: List()
12/12/26 23:17:59 INFO DAGScheduler: Submitting Stage 3 (map at test.scala:1133), which has no missing parents
12/12/26 23:17:59 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3
12/12/26 23:17:59 INFO ClusterScheduler: Adding task set 3.0 with 2 tasks
12/12/26 23:17:59 INFO TaskSetManager: Starting task 3.0:0 as TID 40 on slave 201212262301-638519306-5050-2445-1: ip-10-191-41
-16.ec2.internal (preferred)
12/12/26 23:17:59 INFO TaskSetManager: Serialized task 3.0:0 as 2982 bytes in 2 ms
12/12/26 23:17:59 INFO TaskSetManager: Starting task 3.0:1 as TID 41 on slave 201212262301-638519306-5050-2445-0: ip-10-8-85-1
24.ec2.internal (preferred)
12/12/26 23:17:59 INFO TaskSetManager: Serialized task 3.0:1 as 2982 bytes in 2 ms
12/12/26 23:18:39 INFO CoarseMesosSchedulerBackend: Slave 201212262301-638519306-5050-2445-0 disconnected, so removing it
12/12/26 23:18:39 INFO TaskSetManager: Re-queueing tasks for ip-10-8-85-124.ec2.internal from TaskSet 3.0
12/12/26 23:18:39 INFO TaskSetManager: Lost TID 41 (task 3.0:1)
12/12/26 23:18:39 INFO TaskSetManager: Starting task 3.0:1 as TID 42 on slave 201212262301-638519306-5050-2445-1: ip-10-191-41-16.ec2.internal (preferred)
12/12/26 23:18:39 INFO DAGScheduler: Host lost: ip-10-8-85-124.ec2.internal
12/12/26 23:18:39 INFO TaskSetManager: Serialized task 3.0:1 as 2982 bytes in 2 ms
12/12/26 23:18:39 INFO BlockManagerMasterActor: Trying to remove the host: ip-10-8-85-124.ec2.internal:10902 from BlockManagerMaster.
12/12/26 23:18:39 INFO BlockManagerMasterActor: Previous hosts: ArrayBuffer(BlockManagerId(ip-10-8-85-124.ec2.internal, 48468), BlockManagerId(ip-10-191-41-16.ec2.internal, 54751), BlockManagerId(ip-10-68-199-106.ec2.internal, 41809))
12/12/26 23:18:39 INFO BlockManagerMasterActor: Current hosts: ArrayBuffer(BlockManagerId(ip-10-8-85-124.ec2.internal, 48468), BlockManagerId(ip-10-191-41-16.ec2.internal, 54751), BlockManagerId(ip-10-68-199-106.ec2.internal, 41809))
12/12/26 23:18:39 INFO BlockManagerMaster: Removed ip-10-8-85-124.ec2.internal successfully in notifyADeadHost
12/12/26 23:18:39 INFO Stage: Stage 2 is now unavailable on ip-10-8-85-124.ec2.internal (24/36, false)
12/12/26 23:18:39 INFO CacheTrackerActor: Memory cache lost on ip-10-8-85-124.ec2.internal
12/12/26 23:18:39 INFO CacheTracker: CacheTracker successfully removed entries on ip-10-8-85-124.ec2.internal
12/12/26 23:18:39 INFO CacheTrackerActor: Asked for current cache locations
12/12/26 23:18:39 INFO CoarseMesosSchedulerBackend: Slave 201212262301-638519306-5050-2445-1 disconnected, so removing it
12/12/26 23:18:39 INFO TaskSetManager: Re-queueing tasks for ip-10-191-41-16.ec2.internal from TaskSet 3.0
12/12/26 23:18:39 INFO TaskSetManager: Lost TID 40 (task 3.0:0)
12/12/26 23:18:39 INFO TaskSetManager: Lost TID 42 (task 3.0:1)
12/12/26 23:18:39 INFO DAGScheduler: Host lost: ip-10-191-41-16.ec2.internal
12/12/26 23:18:39 INFO BlockManagerMasterActor: Trying to remove the host: ip-10-191-41-16.ec2.internal:10902 from BlockManagerMaster.
12/12/26 23:18:39 INFO TaskSetManager: Starting task 3.0:1 as TID 43 on slave 201212262301-638519306-5050-2445-2: ip-10-68-199-106.ec2.internal (preferred)
12/12/26 23:18:39 INFO BlockManagerMasterActor: Previous hosts: ArrayBuffer(BlockManagerId(ip-10-8-85-124.ec2.internal, 48468), BlockManagerId(ip-10-191-41-16.ec2.internal, 54751), BlockManagerId(ip-10-68-199-106.ec2.internal, 41809))
12/12/26 23:18:39 INFO BlockManagerMasterActor: Current hosts: ArrayBuffer(BlockManagerId(ip-10-8-85-124.ec2.internal, 48468), BlockManagerId(ip-10-191-41-16.ec2.internal, 54751), BlockManagerId(ip-10-68-199-106.ec2.internal, 41809))
12/12/26 23:18:39 INFO BlockManagerMaster: Removed ip-10-191-41-16.ec2.internal successfully in notifyADeadHost
12/12/26 23:18:39 INFO Stage: Stage 2 is now unavailable on ip-10-191-41-16.ec2.internal (12/36, false)
12/12/26 23:18:39 INFO CacheTrackerActor: Memory cache lost on ip-10-191-41-16.ec2.internal
12/12/26 23:18:39 INFO TaskSetManager: Serialized task 3.0:1 as 2982 bytes in 1 ms
12/12/26 23:18:39 INFO TaskSetManager: Starting task 3.0:0 as TID 44 on slave 201212262301-638519306-5050-2445-2: ip-10-68-199-106.ec2.internal (preferred)
12/12/26 23:18:39 INFO CacheTracker: CacheTracker successfully removed entries on ip-10-191-41-16.ec2.internal
12/12/26 23:18:39 INFO CacheTrackerActor: Asked for current cache locations
12/12/26 23:18:39 INFO TaskSetManager: Serialized task 3.0:0 as 2982 bytes in 1 ms
12/12/26 23:18:40 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to ip-10-68-199-106.ec2.internal
12/12/26 23:18:40 INFO MapOutputTracker: Size of output statuses for shuffle 0 is 174 bytes
12/12/26 23:18:40 INFO TaskSetManager: Lost TID 44 (task 3.0:0)
12/12/26 23:18:40 INFO TaskSetManager: Loss was due to java.lang.NullPointerException

Matei Zaharia

unread,
Dec 26, 2012, 6:57:39 PM12/26/12
to spark...@googlegroups.com
Okay, so it looks like tasks on your nodes are failing. See these messages?

12/12/26 23:18:39 INFO TaskSetManager: Lost TID 41 (task 3.0:1)
12/12/26 23:18:39 INFO DAGScheduler: Host lost: ip-10-8-85-124.ec2.internal

To see why, log into that machine and look at the logs as I described. (Look for the files called stderr and stdout.)

Matei

Qiang Cao

unread,
Dec 26, 2012, 8:18:59 PM12/26/12
to spark...@googlegroups.com
The stdout on ip-10-8-85-124.ec2.internal looks fine:

Registered executor on ip-10-8-85-124.ec2.internal
Starting task 2

=================================================================
Below is the stderr on this machine. It seems that this machine finished task 41, but the
TaskSetManager reports that TID 41 is lost.

12/12/26 23:17:28 INFO network.SendingConnection: Connected to [ip-10-191-41-16.ec2.internal/10.191.41.16:54751], 1 messages pending
12/12/26 23:17:28 INFO network.ConnectionManager: Accepted connection from [ip-10-191-41-16.ec2.internal/10.191.41.16]
12/12/26 23:17:37 INFO network.SendingConnection: Initiating connection to [ip-10-68-199-106.ec2.internal/10.68.199.106:41809]
12/12/26 23:17:37 INFO network.SendingConnection: Connected to [ip-10-68-199-106.ec2.internal/10.68.199.106:41809], 1 messages pending
12/12/26 23:17:37 INFO network.ConnectionManager: Accepted connection from [ip-10-68-199-106.ec2.internal/10.68.199.106]
12/12/26 23:17:56 INFO executor.Executor: Serialized size of result for 39 is 132
12/12/26 23:17:56 INFO executor.Executor: Finished task ID 39
12/12/26 23:17:59 INFO executor.StandaloneExecutorBackend: Got assigned task 41
12/12/26 23:17:59 INFO executor.Executor: Running task ID 41
12/12/26 23:17:59 INFO executor.Executor: Its generation is 0
12/12/26 23:17:59 INFO storage.BlockManager: maxBytesInFlight: 50331648, minRequest: 10066329
12/12/26 23:17:59 INFO storage.BlockManager: maxBytesInFlight: 50331648, minRequest: 10066329
12/12/26 23:17:59 INFO storage.BlockManager: Started 1 remote gets in  2 ms
12/12/26 23:18:37 INFO executor.Executor: Serialized size of result for 41 is 83304537
12/12/26 23:18:37 INFO executor.Executor: Finished task ID 41
(END)
Matei

Matei Zaharia

unread,
Dec 26, 2012, 10:32:18 PM12/26/12
to spark...@googlegroups.com
Hmm, are you sure that this is the output from the most recent run? It seems weird that it would just crash without printing anything. Maybe it's from an old run that actually worked fine.

Matei

Matei Zaharia

unread,
Dec 26, 2012, 10:34:30 PM12/26/12
to spark...@googlegroups.com
Actually, one problem here is definitely that your message for the task result is pretty big (83 MB). Try adding

System.setProperty("spark.akka.frameSize", "200")

before creating your SparkContext. Maybe Akka just gives up when trying to send this big a message and for some reason doesn't log anything.

Matei

Qiang Cao

unread,
Dec 26, 2012, 11:37:44 PM12/26/12
to spark...@googlegroups.com
Hi Matei,

Setting akka.frameSize did it! collect() works fine now. Thanks a lot!

BTW, I'm curious about why akka refuses to send a big message. How can I choose the correct frameSize value according to the input? Thanks!

-Qiang  
Matei

It would also help to do a jmap on the process before it runs out of memory to see what objects are taking that much space; for example, maybe th...
Show original

Matei Zaharia

unread,
Dec 26, 2012, 11:59:42 PM12/26/12
to spark...@googlegroups.com
Akka is really not designed for large messages. I think that the frameSize affects some internal buffers, so making it too high will cause a lot of memory to be allocated to Akka. The problem is really just that we used it in collect() and other methods that return results from tasks; we should use a different library to transfer those results. Anyway, it's really weird that it didn't print anything -- thanks for letting us know about this. It's something we'll watch out for in the future.

Matei

Qiang Cao

unread,
Dec 27, 2012, 12:03:21 AM12/27/12
to spark...@googlegroups.com
Make sense. Thanks for your help!

Qiang
Matei

    at java.util.concurrent.ThreadPoolExecutor.ru...
Show original
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted

greedyrouter

unread,
Jan 5, 2013, 12:20:07 AM1/5/13
to spark...@googlegroups.com
I also encountered similar problems. The answer is helpful, Thanks a lot!
Reply all
Reply to author
Forward
Message has been deleted
Message has been deleted
0 new messages