Spark job fails : Removing BlockManager with no recent heart beats

2,922 views
Skip to first unread message

Austin Chungath

unread,
Jun 4, 2013, 8:59:32 AM6/4/13
to spark...@googlegroups.com

Hi All,

My spark job is a simple map only job which prints out a value for each input line. It runs 10 iterations.

I have 11 nodes with 16 GB memory each.
spark-env.sh has SPARK_MEM=10g

The job runs fine for 20 ~ 30 GB data.
But when I run the job for about 500 GB data the job fails.

Any idea as to why this might be happening?
Shouldn't Spark jobs run like a normal hadoop job if the dataset can't be held in memory?
Please suggest something to fix this issue or some pointers to what I might be doing wrong.

The following is the output on the console.

13/06/04 05:50:18 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 3989)
13/06/04 05:50:19 INFO cluster.TaskSetManager: Finished TID 3950 in 15437 ms (progress: 3996/4000)
13/06/04 05:50:19 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 3949)
13/06/04 05:50:21 INFO cluster.TaskSetManager: Finished TID 3969 in 15175 ms (progress: 3997/4000)
13/06/04 05:50:21 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 3979)
13/06/04 05:50:21 INFO cluster.TaskSetManager: Finished TID 3946 in 17400 ms (progress: 3998/4000)
13/06/04 05:50:21 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 3932)
13/06/04 05:50:22 INFO cluster.TaskSetManager: Finished TID 3959 in 16854 ms (progress: 3999/4000)
13/06/04 05:50:22 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 3950)
13/06/04 05:50:22 INFO cluster.TaskSetManager: Finished TID 3985 in 13940 ms (progress: 4000/4000)
13/06/04 05:50:22 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 3985)
13/06/04 05:50:22 INFO scheduler.DAGScheduler: Stage 1 (groupByKey at TestIteration.scala:29) finished in 464.517 s
13/06/04 05:50:22 INFO scheduler.DAGScheduler: looking for newly runnable stages
13/06/04 05:50:22 INFO scheduler.DAGScheduler: running: Set()
13/06/04 05:50:22 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
13/06/04 05:50:22 INFO scheduler.DAGScheduler: failed: Set()
13/06/04 05:50:22 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
13/06/04 05:50:22 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[7] at saveAsTextFile at TestIteration.scala:31), which is now runnable
13/06/04 05:50:22 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[7] at saveAsTextFile at TestIteration.scala:31)
13/06/04 05:50:22 INFO cluster.ClusterScheduler: Adding task set 0.0 with 1 tasks
13/06/04 05:50:22 INFO cluster.TaskSetManager: Starting task 0.0:0 as TID 4000 on executor 9: node2.example.com (preferred)
13/06/04 05:50:22 INFO cluster.TaskSetManager: Serialized task 0.0:0 as 8534 bytes in 51 ms
13/06/04 05:50:22 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 0 to node2.example.com
13/06/04 05:50:23 INFO spark.MapOutputTracker: Size of output statuses for shuffle 0 is 5286 bytes
13/06/04 05:52:30 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(9, node2.example.com, 48706) with no recent heart beats
13/06/04 05:52:32 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager node2.example.com:48706 with 6.3 GB RAM
13/06/04 05:52:50 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(9, node2.example.com, 48706) with no recent heart beats
13/06/04 05:52:52 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager node2.example.com:48706 with 6.3 GB RAM

Thanks,
Austin


Ian O'Connell

unread,
Jun 4, 2013, 2:59:48 PM6/4/13
to spark...@googlegroups.com
Are you attempting to cache the dataset in memory? It could just be GC overhead is causing the block managers to not report in fast enough, you can change the heart beat timeout when creating the spark context and see how it performs then?

    System.setProperty("spark.storage.blockManagerHeartBeatMs", "300000")

before creating the spark context will do it





--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

ty...@ubixlabs.com

unread,
Jun 4, 2013, 4:56:18 PM6/4/13
to spark...@googlegroups.com
For me to run that amount of data with Spark I had to make a number of adjustments..I forked the spark repo so when I deployed to EC2 all my instances had these settings below.

SPARK_JAVA_OPTS+=" -Dspark.worker.timeout=30000 -Dspark.akka.timeout=30000 -Dspark.storage.blockManagerHeartBeatMs=30000  -Dspark.akka.retry.wait=30000 -Dspark.akka.frameSize=10000 

Patrick Wendell

unread,
Jun 4, 2013, 5:03:55 PM6/4/13
to spark...@googlegroups.com
Ya - I made this a while ago:
https://spark-project.atlassian.net/browse/SPARK-734

Personally I think the defaults are way to low. Tyler, I grabbed your
set of options here and appended to the JIRA, hope that's alright.

ty...@ubixlabs.com

unread,
Jun 4, 2013, 5:32:09 PM6/4/13
to spark...@googlegroups.com
Glad they were of use!

Austin Chungath

unread,
Jun 5, 2013, 9:08:49 AM6/5/13
to spark...@googlegroups.com
Thank you :) 

I have the following parameters in spark-env.sh
SPARK_JAVA_OPTS+=" -Dspark.worker.timeout=30000 -Dspark.akka.timeout=30000 -Dspark.storage.blockManagerHeartBeatMs=30000  -Dspark.akka.retry.wait=30000 -Dspark.akka.frameSize=10000 

But now I get Java heap space errors :(

The code that I run is pretty silly. The map does nothing but print out the current time for each line that the map gets. This output from the map is given a key which is the iteration number and then it's grouped by Key. This group by operation fails with OOM error. 

I am trying to understand why the out of memory errors happen. I have 10 GB in each node assigned to Spark and I have 10 nodes. 
The input that I give to this is 500GB. I was under the impression groupByKey will work even if the data set is larger than the total memory available.
Any clues as to why this is happening would be very helpful.

Also it would be great if someone can tell me the relationship between the data and the number of Tasks (what is TID exactly?. I have 500 1GB files and 4000 tasks spawned)

The following is the code:

object TestIteration {

// The map function
  def TimeMap (row: String): String = {

    var mapOutput = System.currentTimeMillis().toString()
    return mapOutput
  }


// The main function which executes the map and reduce function
  def main(args: Array[String]) {

// Creating the Spark RDD using SparkContext
    System.setProperty("spark.akka.timeout", "60")
    System.setProperty("spark.akka.askTimeout", "60")

    val sc = new SparkContext(args(0), "TestIteration",args(3), List(args(4)))
    var iterativeInput = sc.textFile(args(1),1)

// The Iteration loop args(2) is the number of iterations to run
// I am not caching any output from each iteration
// If I do a groupByKey(88) the Java Heap error occurs
// Without the groupByKey, the code works fine
// There is only one key per iteration (the key is the iteration number)
// 88 is the number of cores that we have 
    for (i <- 1 to args(2).toInt) {
      var mapResult = iterativeInput.map(x => TimeMap (x))
      var finalOut = mapResult.map(x=>(i,x)).groupByKey(88)
      var iterTime = System.currentTimeMillis().toString()
      finalOut.saveAsTextFile(args(1)+"_"+args(2)+"/"+i+"_"+iterTime)
    }

    System.exit(0)
  }
}



13/06/05 04:58:28 INFO scheduler.DAGScheduler: Completed ResultTask(0, 67)
13/06/05 04:58:28 INFO cluster.TaskSetManager: Finished TID 4037 in 3769728                                                                                   ms (progress: 87/88)
13/06/05 04:58:28 INFO scheduler.DAGScheduler: Completed ResultTask(0, 37)
13/06/05 05:01:01 WARN storage.BlockManagerMasterActor: Removing BlockManag                                                                                  er BlockManagerId(0, node7.example.com, 57885) with no recent heart beats
13/06/05 05:01:04 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Re                                                                                  gistering block manager node7.example.com:57885 with 6.3 GB RAM
13/06/05 05:01:21 WARN storage.BlockManagerMasterActor: Removing BlockManag                                                                                  er BlockManagerId(0, node7.example.com, 57885) with no recent heart beats
13/06/05 05:01:39 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Re                                                                                  gistering block manager node7.example.com:57885 with 6.3 GB RAM
13/06/05 05:01:39 INFO cluster.TaskSetManager: Lost TID 4090 (task 0.0:1)
13/06/05 05:01:39 INFO cluster.TaskSetManager: Loss was due to java.lang.Ou                                                                            tOfMemoryError: Java heap space

13/06/05 05:01:39 INFO cluster.TaskSetManager: Starting task 0.0:1 as TID 4                                                                                  091 on executor 0: node7.example.com (preferred)
13/06/05 05:01:39 INFO cluster.TaskSetManager: Serialized task 0.0:1 as 8539 bytes in 0 ms
13/06/05 05:03:48 WARN storage.BlockManagerMasterActor: Removing BlockManag                                                                                  er BlockManagerId(0, node7.example.com, 57885) with no recent heart beats
13/06/05 05:03:48 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Re                                                                                  gistering block manager node7.example.com:57885 with 6.3 GB RAM
13/06/05 05:04:38 WARN storage.BlockManagerMasterActor: Removing BlockManag                                                                                  er BlockManagerId(0, node7.example.com, 57885) with no recent heart beats
13/06/05 05:08:32 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Re                                                                                  gistering block manager node7.example.com:57885 with 6.3 GB RAM
13/06/05 05:08:33 INFO cluster.TaskSetManager: Lost TID 4091 (task 0.0:1)
13/06/05 05:08:33 INFO cluster.TaskSetManager: Loss was due to java.lang.Ou                                                                                  tOfMemoryError: GC overhead limit exceeded

13/06/05 05:08:33 ERROR cluster.TaskSetManager: Task 0.0:1 failed more than                                                                                   4 times; aborting job
13/06/05 05:08:33 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile                                                                                   at TestIteration.scala:34
spark.SparkException: Job failed: Task 0.0:1 failed more than 4 times
        at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:629)
        at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:627)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:627)
        at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:297)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:358)
        at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:102)







--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/a8r9uxgCVpo/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

Ian O'Connell

unread,
Jun 5, 2013, 12:44:47 PM6/5/13
to spark...@googlegroups.com
// There is only one key per iteration (the key is the iteration number)


Your mapping all your values to a single key, so its the same really as just doing a collect. The values for a particular key should be able to fit in memory on a single node.

Generally if you dataset that your operating on isn't going to fit into memory across your cluster your going to face a lot of problems in spark, and your probably better off just using cascading/map reduce.
You can stream in from a bigger set and select a subset to work on, but spark will really shine when the set is in memory across the cluster.


ty...@ubixlabs.com

unread,
Jun 5, 2013, 1:11:52 PM6/5/13
to spark...@googlegroups.com
You'll have to tune your JAVA_OPTS...I use the following for my larger instances -Xms80G -Xmx80G


On Tuesday, June 4, 2013 5:59:32 AM UTC-7, Austin Chungath wrote:

Patrick Wendell

unread,
Jun 5, 2013, 1:21:10 PM6/5/13
to spark...@googlegroups.com
As Ian said, groupByKey() will put all the values *for a given key* in
memory on one node. You should consider just saving as a text file
without doing groupByKey... it doesn't seem like you do any logic with
the group except writing it out to disk.

Austin Chungath

unread,
Jun 7, 2013, 7:17:36 AM6/7/13
to spark...@googlegroups.com
Thanks.. :)

What I have understood so far is:

The data GroupByKey() operates on shouldn't exceed the memory that jvm has and the spark job will fail.  (use -Xms and -Xmx to increase the heap space allocated)

What I had been trying to do in the above logic is find the time taken between iterations.
Command to execute Spark job:   1370415998 Wed Jun  5 02:06:38 CDT 2013
first iteration folder created:   1370416011 Wed Jun  5 02:06:51 CDT 2013   Diff: 13 seconds
first iteration 1st map output:   1370416015 Wed Jun  5 02:06:55 CDT 2013   Diff: 4 seconds
second iteration folder created:  1370416752 Wed Jun  5 02:19:12 CDT 2013   
second iteration 1st map output:  1370416753 Wed Jun  5 02:19:12 CDT 2013   Diff: 1 second

Can someone please let me know what happens in the 13 seconds that I observed before the 1st Iteration begins? I am assuming it is the time taken for reading the dataset into memory. I am using a 500GB dataset and the whole 500GB dataset cant be read into memory because I have only ~100 GB total memory allocated to Spark in 10 nodes. How does spark handle it?

The second iteration begins almost instantaneously and the time between the creation of the folder and the first map output is less than a second.

Regards,
Austin
Reply all
Reply to author
Forward
0 new messages