Hi All,
I am running an application on a five node clusters (64GB RAM, 24 cores each). My cluster is running on top of Spark 0.7.2 and Scala 2.9.3. Here is my other configurations:
System.setProperty("spark.cores.max", "128")
System.setProperty("spark.default.parallelism", "64")
System.setProperty("spark.akka.timeout", "240")
System.setProperty("spark.storage.memoryFraction", "0.5")
System.setProperty("spark.storage.blockManagerTimeoutIntervalMs", "120000")
System.setProperty("spark.storage.blockManagerHeartBeatMs", "60000")
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyKryoRegistrator")
System.setProperty("spark.kryoserializer.buffer.mb", "64")
JAVA_OPTS="-Xms32g -Xms32g -verbose:gc -XX:+PrintGCDetails -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=128m -XX:NewRatio=2"
However, during the stage of "reduceByKey" operation, the master could not find the a task due to "java.lang.IllegalArgumentException: Negative size". Here is the detail:
13/07/10 22:21:53 INFO DAGScheduler: Completed ShuffleMapTask(7, 54)
13/07/10 22:21:53 INFO DAGScheduler: Stage 7 (apply at TraversableLike.scala:233) finished in 31.059 s
13/07/10 22:21:53 INFO DAGScheduler: looking for newly runnable stages
13/07/10 22:21:53 INFO DAGScheduler: running: Set(Stage 4)
13/07/10 22:21:53 INFO DAGScheduler: waiting: Set(Stage 2, Stage 3)
13/07/10 22:21:53 INFO DAGScheduler: failed: Set()
13/07/10 22:21:53 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 3)
13/07/10 22:21:53 INFO DAGScheduler: Missing parents for Stage 3: List(Stage 4)
13/07/10 22:24:31 INFO TaskSetManager: Finished TID 448 in 162645 ms (progress: 253/256)
13/07/10 22:24:31 INFO DAGScheduler: Completed ShuffleMapTask(4, 192)
13/07/10 22:34:32 INFO TaskSetManager: Finished TID 384 in 763638 ms (progress: 254/256)
13/07/10 22:34:32 INFO DAGScheduler: Completed ShuffleMapTask(4, 128)
13/07/10 22:45:03 INFO TaskSetManager: Finished TID 320 in 1398093 ms (progress: 255/256)
13/07/10 22:45:03 INFO DAGScheduler: Completed ShuffleMapTask(4, 64)
13/07/10 23:17:18 INFO TaskSetManager: Finished TID 256 in 3332957 ms (progress: 256/256)
13/07/10 23:17:18 INFO DAGScheduler: Completed ShuffleMapTask(4, 0)
13/07/10 23:17:18 INFO DAGScheduler: Stage 4 (apply at TraversableLike.scala:233) finished in 3332.958 s
13/07/10 23:17:18 INFO DAGScheduler: looking for newly runnable stages
13/07/10 23:17:18 INFO DAGScheduler: running: Set()
13/07/10 23:17:18 INFO DAGScheduler: waiting: Set(Stage 2, Stage 3)
13/07/10 23:17:18 INFO DAGScheduler: failed: Set()
13/07/10 23:17:18 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 3)
13/07/10 23:17:18 INFO DAGScheduler: Missing parents for Stage 3: List()
13/07/10 23:17:18 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[25] at reduceByKey at nmf.scala:241), which is now runnable
13/07/10 23:17:18 INFO DAGScheduler: Submitting 256 missing tasks from Stage 3 (MapPartitionsRDD[25] at reduceByKey at nmf.scala:241)
13/07/10 23:17:18 INFO ClusterScheduler: Adding task set 3.0 with 256 tasks
13/07/10 23:17:18 INFO TaskSetManager: Starting task 3.0:0 as TID 512 on executor 0:
a.b.c1.com (preferred)
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:0 as 2806 bytes in 11 ms
13/07/10 23:17:18 INFO TaskSetManager: Starting task 3.0:1 as TID 513 on executor 1:
a.b.c2.com (preferred)
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:1 as 2806 bytes in 0 ms
13/07/10 23:17:18 INFO TaskSetManager: Starting task 3.0:2 as TID 514 on executor 3:
a.b.c3.com (preferred)
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:2 as 2806 bytes in 0 ms
....
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:119 as 2806 bytes in 0 ms
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to
a.b.c1.com13/07/10 23:17:18 INFO MapOutputTracker: Size of output statuses for shuffle 3 is 1039 bytes
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to
a.b.c2.com13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to
a.b.c3.com13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to
a.b.c4.com13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to
a.b.c5.com13/07/10 23:17:18 INFO TaskSetManager: Lost TID 592 (task 3.0:80)
13/07/10 23:17:18 INFO TaskSetManager: Loss was due to java.lang.IllegalArgumentException: Negative size
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:700)
at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
at spark.storage.DiskStore.getValues(DiskStore.scala:92)
at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
at spark.storage.BlockFetcherIterator$$anonfun$13.apply(BlockManager.scala:1027)
at spark.storage.BlockFetcherIterator$$anonfun$13.apply(BlockManager.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.storage.BlockFetcherIterator.<init>(BlockManager.scala:1026)
at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:51)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:10)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:115)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:695)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:705)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
...
13/07/10 23:17:21 INFO TaskSetManager: Lost TID 638 (task 3.0:80)
13/07/10 23:17:21 INFO TaskSetManager: Loss was due to java.lang.IllegalArgumentException: Negative size [duplicate 4]
13/07/10 23:17:21 ERROR TaskSetManager: Task 3.0:80 failed more than 4 times; aborting job
13/07/10 23:17:21 INFO ClusterScheduler: Ignoring update from TID 641 because its task set is gone
13/07/10 23:17:21 INFO ClusterScheduler: Ignoring update from TID 621 because its task set is gone
13/07/10 23:17:21 INFO DAGScheduler: Failed to run reduce at nmf.scala:241
Exception in thread "main" spark.SparkException: Job failed: Task 3.0:80 failed more than 4 times
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:303)
at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
Heap
PSYoungGen total 6116672K, used 4085091K [0x00007f5799560000, 0x00007f5944000000, 0x00007f5944000000)
eden space 5242880K, 75% used [0x00007f5799560000,0x00007f588a242470,0x00007f58d9560000)
from space 873792K, 15% used [0x00007f58d9560000,0x00007f58e1dd6a90,0x00007f590eab0000)
to space 873792K, 0% used [0x00007f590eab0000,0x00007f590eab0000,0x00007f5944000000)
PSOldGen total 13981056K, used 0K [0x00007f5444000000, 0x00007f5799560000, 0x00007f5799560000)
object space 13981056K, 0% used [0x00007f5444000000,0x00007f5444000000,0x00007f5799560000)
PSPermGen total 42368K, used 42287K [0x00007f543c000000, 0x00007f543e960000, 0x00007f5444000000)
object space 42368K, 99% used [0x00007f543c000000,0x00007f543e94be70,0x00007f543e960000)
Any help will be appreciated. Thanks.
Best,
Haokun