Hi all,
I've got a newly configured standalone cluster, (spark-0.7.0 from the release source tarball) which I've deployed to a new cluster. Because of the way the cluster is set up, I had to set spark.local.dir to something other than /tmp (did this in spark-env.sh on all machines).
My cluster starts normally, and a little way through the execution of my job (long enough that it can complete some simple map/reduce tasks - limited shuffling) - I run a sortByKey, which shuffles a fair amount, and I start to see errors like this:
13/04/11 19:32:50 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to hostXX.Berkeley.EDU 13/04/11 19:32:50 INFO spark.MapOutputTracker: Size of output statuses for shuffle 1 is 146 bytes
13/04/11 19:32:50 INFO cluster.TaskSetManager: Lost TID 65 (task 6.6:0)
13/04/11 19:32:50 INFO cluster.TaskSetManager: Loss was due to fetch failure from null
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Marking Stage 6 (sortByKey at h3.scala:88) for resubmision due to a fetch failure
13/04/11 19:32:50 INFO scheduler.DAGScheduler: The failed fetch was from Stage 7 (map at h3.scala:86); marking it for resubmission
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Resubmitting failed stages
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Submitting Stage 7 (MappedRDD[9] at map at h3.scala:86), which has no missing parents
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 7 (MappedRDD[9] at map at h3.scala:86)
13/04/11 19:32:50 INFO cluster.ClusterScheduler: Adding task set 7.7 with 1 tasks
13/04/11 19:32:50 INFO cluster.TaskSetManager: Starting task 7.7:0 as TID 66 on executor 3: hostXX.Berkeley.EDU (preferred) 13/04/11 19:32:50 INFO cluster.TaskSetManager: Serialized task 7.7:0 as 1908 bytes in 0 ms
13/04/11 19:32:50 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to hostXX.Berkeley.EDU 13/04/11 19:32:50 INFO spark.MapOutputTracker: Size of output statuses for shuffle 0 is 184 bytes
13/04/11 19:32:50 INFO client.Client$ClientActor: Executor updated: app-20130411192825-0018/13 is now FAILED (Command exited with code 1)
13/04/11 19:32:50 INFO cluster.SparkDeploySchedulerBackend: Executor app-20130411192825-0018/13 removed: Command exited with code 1
13/04/11 19:32:50 INFO client.Client$ClientActor: Executor added: app-20130411192825-0018/14 on worker-20130411115238-hostYY.Berkeley.EDU-40417 (hostYY.Berkeley.EDU) with 7 cores
Loooking at stderr from the task that failed (on hostXX) - I see something like the following:
13/04/12 18:34:57 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.
13/04/12 18:34:57 ERROR storage.BlockManagerWorker: Exception handling buffer message
java.io.FileNotFoundException: /scratch/cs294-co/tmp/spark-local-20130412183154-3071/10/shuffle_1_0_3 (No such file or directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
at spark.storage.DiskStore.getBytes(DiskStore.scala:85)
at spark.storage.BlockManager.getLocalBytes(BlockManager.scala:354)
at spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:79)
at spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:58)
at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33)
at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.IndexedSeqLike$Elements.foreach(IndexedSeqLike.scala:54)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:12)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at spark.storage.BlockMessageArray.map(BlockMessageArray.scala:12)
at spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:33)
at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23)
at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23)
at spark.network.ConnectionManager.spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:269)
at spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:235)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:636)
The code seems to work fine in local mode, something very similar works from the spark shell. But - I'm compiling an external library into my code so I'm packaging it up with sbt-assembly and sending it out to the workers that way.
Any ideas why shuffle files might not show up in the new spark.local.dir? Ulimit looks fine, disk space looks fine, etc.
Thanks!
Evan