Standalone akka-actors failing on shuffle

1,344 views
Skip to first unread message

Evan Sparks

unread,
Apr 14, 2013, 6:07:20 PM4/14/13
to spark...@googlegroups.com
Hi all,

I've got a newly configured standalone cluster, (spark-0.7.0 from the release source tarball) which I've deployed to a new cluster. Because of the way the cluster is set up, I had to set spark.local.dir to something other than /tmp (did this in spark-env.sh on all machines). 

My cluster starts normally, and a little way through the execution of my job (long enough that it can complete some simple map/reduce tasks - limited shuffling) - I run a sortByKey, which shuffles a fair amount, and I start to see errors like this:

13/04/11 19:32:50 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to hostXX.Berkeley.EDU
13/04/11 19:32:50 INFO spark.MapOutputTracker: Size of output statuses for shuffle 1 is 146 bytes
13/04/11 19:32:50 INFO cluster.TaskSetManager: Lost TID 65 (task 6.6:0)
13/04/11 19:32:50 INFO cluster.TaskSetManager: Loss was due to fetch failure from null
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Marking Stage 6 (sortByKey at h3.scala:88) for resubmision due to a fetch failure
13/04/11 19:32:50 INFO scheduler.DAGScheduler: The failed fetch was from Stage 7 (map at h3.scala:86); marking it for resubmission
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Resubmitting failed stages
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Submitting Stage 7 (MappedRDD[9] at map at h3.scala:86), which has no missing parents
13/04/11 19:32:50 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 7 (MappedRDD[9] at map at h3.scala:86)
13/04/11 19:32:50 INFO cluster.ClusterScheduler: Adding task set 7.7 with 1 tasks
13/04/11 19:32:50 INFO cluster.TaskSetManager: Starting task 7.7:0 as TID 66 on executor 3: hostXX.Berkeley.EDU (preferred)
13/04/11 19:32:50 INFO cluster.TaskSetManager: Serialized task 7.7:0 as 1908 bytes in 0 ms
13/04/11 19:32:50 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to hostXX.Berkeley.EDU
13/04/11 19:32:50 INFO spark.MapOutputTracker: Size of output statuses for shuffle 0 is 184 bytes
13/04/11 19:32:50 INFO client.Client$ClientActor: Executor updated: app-20130411192825-0018/13 is now FAILED (Command exited with code 1)
13/04/11 19:32:50 INFO cluster.SparkDeploySchedulerBackend: Executor app-20130411192825-0018/13 removed: Command exited with code 1
13/04/11 19:32:50 INFO client.Client$ClientActor: Executor added: app-20130411192825-0018/14 on worker-20130411115238-hostYY.Berkeley.EDU-40417 (hostYY.Berkeley.EDU) with 7 cores

Loooking at stderr from the task that failed (on hostXX) - I see something like the following:
13/04/12 18:34:57 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.
13/04/12 18:34:57 ERROR storage.BlockManagerWorker: Exception handling buffer message
java.io.FileNotFoundException: /scratch/cs294-co/tmp/spark-local-20130412183154-3071/10/shuffle_1_0_3 (No such file or directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
at spark.storage.DiskStore.getBytes(DiskStore.scala:85)
at spark.storage.BlockManager.getLocalBytes(BlockManager.scala:354)
at spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:79)
at spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:58)
at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33)
at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.IndexedSeqLike$Elements.foreach(IndexedSeqLike.scala:54)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:12)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at spark.storage.BlockMessageArray.map(BlockMessageArray.scala:12)
at spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:33)
at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23)
at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23)
at spark.network.ConnectionManager.spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:269)
at spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:235)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:636)

The code seems to work fine in local mode, something very similar works from the spark shell. But - I'm compiling an external library into my code so I'm packaging it up with sbt-assembly and sending it out to the workers that way.

Any ideas why shuffle files might not show up in the new spark.local.dir? Ulimit looks fine, disk space looks fine, etc.

Thanks!
Evan

Matei Zaharia

unread,
Apr 14, 2013, 10:18:44 PM4/14/13
to spark...@googlegroups.com
Is that the first error in the log? Maybe it failed to create the file. Make sure the file also exists.

Matei

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Evan Sparks

unread,
Apr 15, 2013, 12:07:57 AM4/15/13
to spark...@googlegroups.com
The file does not exist, but there's no corresponding error in the file. 
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/wwQAWwKC9jE/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

Davis Shepherd

unread,
Jun 18, 2013, 7:10:46 PM6/18/13
to spark...@googlegroups.com
Is there any update on this? I have experienced a similar issue. It seems related to the internal state of our RDD that we create over time (perhaps the number of partitions...) thought we do actively coalesce after union operations to make sure that this doesn't grow too large.


3/06/18 22:18:09 INFO storage.BlockManager: Started 48 remote gets in  1 ms
13/06/18 22:18:09 INFO executor.Executor: Serialized size of result for 644 is 90
13/06/18 22:18:09 INFO executor.Executor: Finished task ID 644
13/06/18 22:18:10 INFO executor.Executor: Serialized size of result for 692 is 10448939
13/06/18 22:18:10 INFO executor.Executor: Finished task ID 692
13/06/18 22:18:10 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.
13/06/18 22:18:10 ERROR storage.BlockManagerWorker: Exception handling buffer message
java.io.FileNotFoundException: spark/logs/spark-local-20130618221751-6aba/3b/shuffle_6_0_52 (No such file or directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.<init>(RandomAccessFile.java:216)
at spark.storage.DiskStore.getBytes(DiskStore.scala:85)
at spark.storage.BlockManager.getLocalBytes(BlockManager.scala:354)
at spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:79)
at spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:58)
at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33)
at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.IndexedSeqLike$Elements.foreach(IndexedSeqLike.scala:54)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:12)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at spark.storage.BlockMessageArray.map(BlockMessageArray.scala:12)
at spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:33)
at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23)
at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23)
at spark.network.ConnectionManager.spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:269)
at spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:235)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Davis Shepherd

unread,
Jun 27, 2013, 4:12:37 PM6/27/13
to spark...@googlegroups.com
This is happening with or without spark.cleaner.ttl set. What seems to happen is that the spark-local-* directory is incorrect in the path it is looking for the shuffle file. Its as if the spark-local directory is getting moved/deleted out from under the executor.

Davis Shepherd

unread,
Jul 10, 2013, 9:56:05 PM7/10/13
to spark...@googlegroups.com
Turns out this was a spark.akka.frameSize issue. As soon as our job surpassed 10MB in size on map outputs, the executors would crash and throw the FNF exception. Once we bumped spark.akka.frameSize up to 1024, the problem ceased.
Reply all
Reply to author
Forward
0 new messages