Lost Task due to "java.lang.IllegalArgumentException: Negative size"

1,581 views
Skip to first unread message

Haokun Luo

unread,
Jul 10, 2013, 8:45:28 PM7/10/13
to spark...@googlegroups.com
Hi All,

I am running an application on a five node clusters (64GB RAM, 24 cores each). My cluster is running on top of Spark 0.7.2 and Scala 2.9.3. Here is my other configurations:

System.setProperty("spark.cores.max", "128")
System.setProperty("spark.default.parallelism", "64")
System.setProperty("spark.akka.timeout", "240")
System.setProperty("spark.storage.memoryFraction", "0.5")
System.setProperty("spark.storage.blockManagerTimeoutIntervalMs", "120000")
System.setProperty("spark.storage.blockManagerHeartBeatMs", "60000")
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyKryoRegistrator")
System.setProperty("spark.kryoserializer.buffer.mb", "64")
JAVA_OPTS="-Xms32g -Xms32g -verbose:gc -XX:+PrintGCDetails -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=128m -XX:NewRatio=2"

However, during the stage of "reduceByKey" operation, the master could not find the a task due to "java.lang.IllegalArgumentException: Negative size". Here is the detail:

13/07/10 22:21:53 INFO DAGScheduler: Completed ShuffleMapTask(7, 54)
13/07/10 22:21:53 INFO DAGScheduler: Stage 7 (apply at TraversableLike.scala:233) finished in 31.059 s
13/07/10 22:21:53 INFO DAGScheduler: looking for newly runnable stages
13/07/10 22:21:53 INFO DAGScheduler: running: Set(Stage 4)
13/07/10 22:21:53 INFO DAGScheduler: waiting: Set(Stage 2, Stage 3)
13/07/10 22:21:53 INFO DAGScheduler: failed: Set()
13/07/10 22:21:53 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 3)
13/07/10 22:21:53 INFO DAGScheduler: Missing parents for Stage 3: List(Stage 4)
13/07/10 22:24:31 INFO TaskSetManager: Finished TID 448 in 162645 ms (progress: 253/256)
13/07/10 22:24:31 INFO DAGScheduler: Completed ShuffleMapTask(4, 192)
13/07/10 22:34:32 INFO TaskSetManager: Finished TID 384 in 763638 ms (progress: 254/256)
13/07/10 22:34:32 INFO DAGScheduler: Completed ShuffleMapTask(4, 128)
13/07/10 22:45:03 INFO TaskSetManager: Finished TID 320 in 1398093 ms (progress: 255/256)
13/07/10 22:45:03 INFO DAGScheduler: Completed ShuffleMapTask(4, 64)
13/07/10 23:17:18 INFO TaskSetManager: Finished TID 256 in 3332957 ms (progress: 256/256)
13/07/10 23:17:18 INFO DAGScheduler: Completed ShuffleMapTask(4, 0)
13/07/10 23:17:18 INFO DAGScheduler: Stage 4 (apply at TraversableLike.scala:233) finished in 3332.958 s
13/07/10 23:17:18 INFO DAGScheduler: looking for newly runnable stages
13/07/10 23:17:18 INFO DAGScheduler: running: Set()
13/07/10 23:17:18 INFO DAGScheduler: waiting: Set(Stage 2, Stage 3)
13/07/10 23:17:18 INFO DAGScheduler: failed: Set()
13/07/10 23:17:18 INFO DAGScheduler: Missing parents for Stage 2: List(Stage 3)
13/07/10 23:17:18 INFO DAGScheduler: Missing parents for Stage 3: List()
13/07/10 23:17:18 INFO DAGScheduler: Submitting Stage 3 (MapPartitionsRDD[25] at reduceByKey at nmf.scala:241), which is now runnable
13/07/10 23:17:18 INFO DAGScheduler: Submitting 256 missing tasks from Stage 3 (MapPartitionsRDD[25] at reduceByKey at nmf.scala:241)
13/07/10 23:17:18 INFO ClusterScheduler: Adding task set 3.0 with 256 tasks
13/07/10 23:17:18 INFO TaskSetManager: Starting task 3.0:0 as TID 512 on executor 0: a.b.c1.com (preferred)
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:0 as 2806 bytes in 11 ms
13/07/10 23:17:18 INFO TaskSetManager: Starting task 3.0:1 as TID 513 on executor 1: a.b.c2.com (preferred)
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:1 as 2806 bytes in 0 ms
13/07/10 23:17:18 INFO TaskSetManager: Starting task 3.0:2 as TID 514 on executor 3: a.b.c3.com (preferred)
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:2 as 2806 bytes in 0 ms
....
13/07/10 23:17:18 INFO TaskSetManager: Serialized task 3.0:119 as 2806 bytes in 0 ms
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to a.b.c1.com
13/07/10 23:17:18 INFO MapOutputTracker: Size of output statuses for shuffle 3 is 1039 bytes
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to a.b.c2.com
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to a.b.c3.com
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to a.b.c4.com
13/07/10 23:17:18 INFO MapOutputTrackerActor: Asked to send map output locations for shuffle 3 to a.b.c5.com
13/07/10 23:17:18 INFO TaskSetManager: Lost TID 592 (task 3.0:80)
13/07/10 23:17:18 INFO TaskSetManager: Loss was due to java.lang.IllegalArgumentException: Negative size
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:700)
at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
at spark.storage.DiskStore.getValues(DiskStore.scala:92)
at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
at spark.storage.BlockFetcherIterator$$anonfun$13.apply(BlockManager.scala:1027)
at spark.storage.BlockFetcherIterator$$anonfun$13.apply(BlockManager.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.storage.BlockFetcherIterator.<init>(BlockManager.scala:1026)
at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:51)
at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:10)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127)
at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:115)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:695)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:705)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
at spark.RDD.iterator(RDD.scala:196)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:127)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:75)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
...
13/07/10 23:17:21 INFO TaskSetManager: Lost TID 638 (task 3.0:80)
13/07/10 23:17:21 INFO TaskSetManager: Loss was due to java.lang.IllegalArgumentException: Negative size [duplicate 4]
13/07/10 23:17:21 ERROR TaskSetManager: Task 3.0:80 failed more than 4 times; aborting job
13/07/10 23:17:21 INFO ClusterScheduler: Ignoring update from TID 641 because its task set is gone
13/07/10 23:17:21 INFO ClusterScheduler: Ignoring update from TID 621 because its task set is gone
13/07/10 23:17:21 INFO DAGScheduler: Failed to run reduce at nmf.scala:241
Exception in thread "main" spark.SparkException: Job failed: Task 3.0:80 failed more than 4 times
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:303)
at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
Heap
 PSYoungGen      total 6116672K, used 4085091K [0x00007f5799560000, 0x00007f5944000000, 0x00007f5944000000)
  eden space 5242880K, 75% used [0x00007f5799560000,0x00007f588a242470,0x00007f58d9560000)
  from space 873792K, 15% used [0x00007f58d9560000,0x00007f58e1dd6a90,0x00007f590eab0000)
  to   space 873792K, 0% used [0x00007f590eab0000,0x00007f590eab0000,0x00007f5944000000)
 PSOldGen        total 13981056K, used 0K [0x00007f5444000000, 0x00007f5799560000, 0x00007f5799560000)
  object space 13981056K, 0% used [0x00007f5444000000,0x00007f5444000000,0x00007f5799560000)
 PSPermGen       total 42368K, used 42287K [0x00007f543c000000, 0x00007f543e960000, 0x00007f5444000000)
  object space 42368K, 99% used [0x00007f543c000000,0x00007f543e94be70,0x00007f543e960000)

Any help will be appreciated. Thanks.

Best,
Haokun

Haokun Luo

unread,
Jul 12, 2013, 6:28:03 PM7/12/13
to spark...@googlegroups.com
I dig more on the source codes in spark.storage.DiskStore:

 82   override def getBytes(blockId: String): Option[ByteBuffer] = {
 83     val file = getFile(blockId)
 84     val length = file.length().toInt
 85     val channel = new RandomAccessFile(file, "r").getChannel()
 86     val bytes = channel.map(MapMode.READ_ONLY, 0, length)
 87     channel.close()
 88     Some(bytes)
 89   }

Basically for some reasons, the length is negative which lead to the negative size exception. My question at this point is under what circumstances, the file size is negative (i.e. file not exist or anything else)?

Best,
Haokun

Reynold Xin

unread,
Jul 12, 2013, 6:52:34 PM7/12/13
to spark...@googlegroups.com
Actually I replied to your email offline just before I saw this ...

Int supports only up to 2G - which means you have a single shuffle block > 2G in size.

You can try increase the number of reducers to reduce the size of each shuffle block. 

We are working on a fix on this as part of https://github.com/mesos/spark/pull/669


--
Reynold Xin, AMPLab, UC Berkeley



--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Haokun Luo

unread,
Jul 12, 2013, 7:49:21 PM7/12/13
to spark...@googlegroups.com
Hi Reynold,

Really appreciate your reply.

So far I have tuned the "spark.default.parallelism", and the numPartitions parameters to be higher in the code, is there any other parameters that I could modify to increase the reducer number?

On the other side, is there a way to specifically set the size of shuffle block so that the reducer number will be change accordingly?

Best,
Haokun

Haokun Luo

unread,
Jul 15, 2013, 11:53:18 AM7/15/13
to spark...@googlegroups.com
Here is the response from Reynold offline:
  • Regarding the negative size issue, so far I have tuned the "spark.default.parallelism", and the numPartitions parameters to be higher in the code, is there any other parameters that I could modify to increase the reducer number?
numPartitions is the one.
  • On the other side, is there a way to specifically set the size of shuffle block so that the reducer number will be change accordingly?

Unfortunately no.

Hope this helps to anyone else.

Best,

Haokun

Reply all
Reply to author
Forward
0 new messages