Spark reduce causes StackOverflowError

3,525 views
Skip to first unread message

Haokun Luo

unread,
Jun 25, 2013, 9:14:00 PM6/25/13
to spark...@googlegroups.com
Hi All,

I just started to program with Spark. I am testing a machine learning algorithm on a standalone machine, and the last step of the algorithm requires to calculate the MSE (mean square error) between two matrices, i.e. || A - M||^2 and we do a element wise subtraction between the two matrices. Since the potential size of A is extremely large and sparse, we store the matrix in terms of (key, value) pairs, where the key is the coordinate (i,j) and the value is a tuple of the corresponding element of A and M, i.e. (A_ij, M_ij). The whole ML algorithm is gradient descent, so for each iteration we calculate the MSE and test it against a certain threshold. However, the whole program run normally without calculating the MSE for each iteration. Here is how the program looks like:

val ITERATIONS = 100
for (i <- 1 to ITERATIONS) {
  ... // calculate M for each iteration
  val mse = A.map{ x => 
    val A_ij = x._2(0) 
    val M_ij = x._2(1)
    (A_ij - M_ij) * (A_ij - M_ij)
  }.reduce(_+_)
  ...
}

This program could only run up to 45 iterations, and it will crash with the following error:

[error] (run-main) spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
spark.SparkException: Job failed: ShuffleMapTask(764, 0) failed: ExceptionFailure(java.lang.StackOverflowError)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:601)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:300)
at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)

Another observation is that for each iteration the runtime will increasing increase by around 5%. Also without "reduce(_+_)", there is no StackOverflowError. 
I have tried to increase the parallelism to the total number of possible physical threads, but that doesn't help.

Really appreciate anyone could point out some direction that I could figure out the root cause of the stack overflow error.

Best,
Haokun

Haokun Luo

unread,
Jun 27, 2013, 6:32:00 PM6/27/13
to spark...@googlegroups.com
I have tried to increase stack size, localize the utility functions, using spark.util.Vector, but unfortunately none of them work it out. Then I tried to downgrade Spark from 0.7.2 to 0.6.3 (https://github.com/mesos/spark/tree/branch-0.6 ). And it works and no more Stack Overflow even for a 10,000 by 10,000 matrix. I don't know how exactly it fix it, so I post the difference between the reduce function in RDD.scala:

--- spark-0.6.3/core/src/main/scala/spark/RDD.scala 2013-06-27 11:31:12.628017194 -0700
+++ spark-0.7.2/core/src/main/scala/spark/RDD.scala 2013-06-27 13:42:22.844686240 -0700
@@ -316,39 +468,93 @@
   def reduce(f: (T, T) => T): T = {
     val cleanF = sc.clean(f)
+    // println("RDD.reduce: after sc.clean")
     val reducePartition: Iterator[T] => Option[T] = iter => {
       if (iter.hasNext) {
         Some(iter.reduceLeft(cleanF))
-      }else {
+      } else {
         None
       }
     }
-    val options = sc.runJob(this, reducePartition)
-    val results = new ArrayBuffer[T]
-    for (opt <- options; elem <- opt) {
-      results += elem
-    }
-    if (results.size == 0) {
-      throw new UnsupportedOperationException("empty collection")
-    } else {
-      return results.reduceLeft(cleanF)
+    // println("RDD.reduce: after reducePartition")
+    var jobResult: Option[T] = None
+    val mergeResult = (index: Int, taskResult: Option[T]) => {
+      if (taskResult != None) {
+        jobResult = jobResult match {
+          case Some(value) => Some(f(value, taskResult.get))
+          case None => taskResult
+        }
+      }
     }
+    // println("RDD.reduce: after jobResult")
+    sc.runJob(this, reducePartition, mergeResult)
+    // println("RDD.reduce: after sc.runJob")
+    // Get the final result out of our Option, or throw an exception if the RDD was empty
+    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+    // println("RDD.reduce: finished")
   }

   /**
    * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to 
+   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
    * modify t1 and return it as its result value to avoid object allocation; however, it should not
    * modify t2.
    */
   def fold(zeroValue: T)(op: (T, T) => T): T = {
+    // Clone the zero value since we will also be serializing it as part of tasks
+    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
     val cleanOp = sc.clean(op)
-    val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
-    return results.fold(zeroValue)(cleanOp)
+    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+    sc.runJob(this, foldPartition, mergeResult)
+    jobResult
   }

Hope any developer could head up and point out the root cause.

Best,
Haokun

Matei Zaharia

unread,
Jun 29, 2013, 12:39:22 AM6/29/13
to spark...@googlegroups.com
Hi Haokun,

Do you update your RDD on each iteration somehow? It would help to see more of the code for this. I don't think the problem is in reduce(), I think it's that you have an RDD whose lineage grows increasingly large.

If you do need to update it on each iteration, try calling checkpoint() on it every 10 iterations; that will limit the size of this graph.

Matei

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Haokun Luo

unread,
Jun 29, 2013, 2:31:33 AM6/29/13
to spark...@googlegroups.com
Hi Matei,

Here is the main loop of the code:
while (i <= ITERATION && err >= THRESHOLD) {      
  // AW: group by row, then create key by col
  // split A by row
  // (col, (A_w_M_element, W_row_vector, (row, col)))
  AW = A.map(x =>
    (x._1._1, (x._1, x._2))
  ).cogroup(W).flatMap( x => {
    val wt_i = x._2._2(0)
    val A_i_by_j = x._2._1
    A_i_by_j.map( j => (j._1._2, (j._2, wt_i, j._1)))
  })

  // calculate the X = Wt*A
  X_i_by_j = AW.map( k => 
    (k._1, k._2._2.map(_*k._2._1(0)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // Y = Wt*M = Wt*WH at the same time  
  Y_i_by_j = AW.map( k => 
    (k._1, k._2._2.map(_*k._2._1(2)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // X ./ Y
  X_divide_Y = X_i_by_j.join(Y_i_by_j).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, divide))
  )

  // H = H .* X_divide_Y
  H = H.join(X_divide_Y).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, multiple))
  )

  // Update M = WH
  // M = matrix_multi_local(AW, H)
  A = AW.join(H).map( x => {
    val orig_AwM = x._2._1._1
    val W_row = x._2._1._2
    val cord = x._2._1._3
    val H_col = x._2._2
    // notice that we include original A here as well
    (cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
  })

  // split M into two intermediate matrix (one by row, and the other by col)

  /*val M_by_i = M.map(x =>
    (x._1._1, (x._1, x._2))
  )
  val M_by_j = M.map(x =>
    (x._1._2, (x._1, x._2))
  )*/

  // AH: group by col, then create key by row
  // Divide A by row first
  // val AH = matrix_join_local(M_by_j, H)
  AH = A.map(x =>
    (x._1._2, (x._1, x._2))
  ).cogroup(H).flatMap( x => {
    val H_col = x._2._2(0)
    val AM_j_by_i = x._2._1
    AM_j_by_i.map( i => (i._1._1, (i._2, H_col, i._1)))
  })

  // calculate V = At*H
  V_j_by_i = AH.map( k => 
    (k._1, k._2._2.map(_*k._2._1(0)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // calculate U = Mt*H
  U_j_by_i = AH.map( k => 
    (k._1, k._2._2.map(_*k._2._1(2)))
  ).reduceByKey(op_two_arrays(_, _, add))

  // V / U
  V_divide_U = V_j_by_i.join(U_j_by_i).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, divide))
  )

  // W = W .* V_divide_U
  W = W.join(V_divide_U).map(x => 
    (x._1, op_two_arrays(x._2._1, x._2._2, multiple))
  )
  // M = W*H
  A = AH.join(W).map( x => {
    val orig_AwM = x._2._1._1
    val H_col = x._2._1._2
    val cord = x._2._1._3
    val W_row = x._2._2
    // notice that we include original A here as well
    (cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
  })  

  // Calculate the error
  // calculate the sequre of difference
  err = A.map( x => (x._2(0) - x._2(2))*(x._2(0) - x._2(2))/A_len).reduce(_+_)
  println("At round " + i + ": MSE is " + err)
}

Basically the algorithm is factorize matrix A into W * H, and each iteration it will update A, W and H using gradient descent. The input size of A is around 17,000 * 480,000, W is 17,000 * 200 and H is 200 * 480,000. I am little bit curious why the lineage is grows increasingly large, cause I thought RDD won't store those intermediate results but their transformation, which should be really cheap? Also could you elaborate on how checkpoint() works? Thanks.

Best,
Haokun

Ian O'Connell

unread,
Jun 29, 2013, 12:32:59 PM6/29/13
to spark...@googlegroups.com
Its worth reading the API documentation on checkpoint... or searching the group for other discussions..


  1. Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.


It looks like the lineage will overflow in time, the history of an RDD to recreate it being stored does grow as enough info/code needs to be present to rebuild it at each step. So its non-trivial, regular checkpointing should avoid that problem


Matei Zaharia

unread,
Jun 29, 2013, 3:00:45 PM6/29/13
to spark...@googlegroups.com
Yeah, basically checkpoint() will cut of the lineage graph, whereas otherwise you'd be growing a graph with hundreds of stages (many transformations on each iteration). The scheduler looks through that graph each time an operation is submitted so that's why it can cause stack overflows.

Matei

Haokun Luo

unread,
Jun 29, 2013, 7:54:56 PM6/29/13
to spark...@googlegroups.com
Got it, and the checkpoint() really works.

However, I met another issue. The one of "storage.BlockManagerMaster" could hang in the middle without response:

(Previous round response) 13/06/29 16:04:51 INFO storage.BlockManagerMaster: Updated info of block rdd_2_46
(Previous round response) 13/06/29 16:06:25 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(<driver>, haluo-ld1, 54668) with no recent heart beats
13/06/29 16:06:25 INFO storage.BlockManager: BlockManager reregistering with master
13/06/29 16:06:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
13/06/29 16:06:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager haluo-ld1:54668 with 30.7 GB RAM
13/06/29 16:06:25 INFO storage.BlockManagerMaster: Registered BlockManager
13/06/29 16:06:25 INFO storage.BlockManager: Reporting 148 blocks to the master.
13/06/29 16:06:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_17 in memory on haluo-ld1:54668 (size: 190.1 MB, free: 30.5 GB)
13/06/29 16:06:25 INFO storage.BlockManagerMaster: Updated info of block rdd_2_17
13/06/29 16:06:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_18 in memory on haluo-ld1:54668 (size: 190.1 MB, free: 30.3 GB)
13/06/29 16:06:25 INFO storage.BlockManagerMaster: Updated info of block rdd_2_18
13/06/29 16:06:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_15 in memory on haluo-ld1:54668 (size: 190.1 MB, free: 30.1 GB)
..... // update for 72 blocks, then stuck at RDD No. 46
13/06/29 16:04:51 INFO storage.BlockManagerMaster: Updated info of block rdd_2_46
...... // hang here forever

It is clearly that I do have free RAM available. My spark version is 0.7.2, and could you point me some directions to investigate on this problem? Thanks.

Best,
Haokun
...

Matei Zaharia

unread,
Jun 30, 2013, 8:34:31 PM6/30/13
to spark...@googlegroups.com
I think this means a garbage collection pause was too big and so the master thought that node died. Try doing

System.setProperty("spark.storage.blockManagerTimeoutIntervalMs", "120000")

Matei

Haokun Luo

unread,
Jul 2, 2013, 7:49:05 PM7/2/13
to spark...@googlegroups.com
Hi Matei,

Resetting the blockManagerTimeoutIntervalMs helps the program not hanging in the allocating RDD in memory stage. However, a new issue raised during the "storage.BlockFetcherIterator" stage. Basically, after the LocalScheduler finished the ShuffleMapTask, the program right now hanging at the "storage.BlockFetcherIterator". Here is the detail:

13/07/02 16:12:33 INFO local.LocalScheduler: Finished ShuffleMapTask(5, 53)
13/07/02 16:12:33 INFO local.LocalScheduler: Running ShuffleMapTask(5, 54)
13/07/02 16:12:33 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 53)
13/07/02 16:12:33 INFO local.LocalScheduler: Size of task 54 is 2668 bytes
13/07/02 16:12:35 INFO local.LocalScheduler: Finished ShuffleMapTask(5, 54)
13/07/02 16:12:35 INFO local.LocalScheduler: Running ShuffleMapTask(5, 55)
13/07/02 16:12:35 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 54)
13/07/02 16:12:35 INFO local.LocalScheduler: Size of task 55 is 2668 bytes
13/07/02 16:12:36 INFO local.LocalScheduler: Finished ShuffleMapTask(5, 55)
13/07/02 16:12:36 INFO local.LocalScheduler: Running ShuffleMapTask(2, 0)
13/07/02 16:12:36 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 55)
13/07/02 16:12:36 INFO scheduler.DAGScheduler: Stage 5 (apply at TraversableLike.scala:233) finished in 347.897 s
13/07/02 16:12:36 INFO scheduler.DAGScheduler: looking for newly runnable stages
13/07/02 16:12:36 INFO scheduler.DAGScheduler: running: Set(Stage 2)
13/07/02 16:12:36 INFO scheduler.DAGScheduler: waiting: Set(Stage 1)
13/07/02 16:12:36 INFO scheduler.DAGScheduler: failed: Set()
13/07/02 16:12:36 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List(Stage 2)
13/07/02 16:12:36 INFO local.LocalScheduler: Size of task 0 is 2580 bytes
13/07/02 16:12:36 INFO storage.BlockFetcherIterator: Started 0 remote gets in  4 ms
[GC [PSYoungGen: 7625248K->351718K(7968064K)] 21455874K->15444022K(25649664K), 1.1217690 secs] [Times: user=7.28 sys=3.93, real=1.12 secs] 
[GC [PSYoungGen: 7968038K->420519K(5366464K)] 23060342K->17732309K(23048064K), 1.8894910 secs] [Times: user=11.67 sys=7.20, real=1.89 secs] 
[Full GC [PSYoungGen: 420519K->49939K(5366464K)] [PSOldGen: 17311789K->17681600K(25165824K)] 17732309K->17731539K(30532288K) [PSPermGen: 66712K->66712K(148160K)], 32.4208350 secs] [Times: user=31.73 sys=0.70, real=32.41 secs] 
[GC [PSYoungGen: 4945920K->1721322K(6667264K)] 22627520K->20050513K(31833088K), 1.8276300 secs] [Times: user=16.19 sys=2.05, real=1.83 secs] 
[GC [PSYoungGen: 6667242K->1721317K(4517632K)] 24996433K->22365685K(29683456K), 1.8349200 secs] [Times: user=12.36 sys=5.98, real=1.84 secs] 
[GC [PSYoungGen: 4508210K->2796141K(5592448K)] 25152578K->23712997K(30758272K), 0.8782690 secs] [Times: user=7.88 sys=0.87, real=0.88 secs] 
[GC [PSYoungGen: 5592429K->2796138K(5592448K)] 26509285K->24986830K(30758272K), 1.3362900 secs] [Times: user=9.60 sys=2.45, real=1.34 secs] 
[GC [PSYoungGen: 5592426K->2796152K(5592448K)] 27783118K->26313894K(30758272K), 1.3514270 secs] [Times: user=10.12 sys=3.39, real=1.35 secs] 
[Full GC [PSYoungGen: 2796152K->0K(5592448K)] [PSOldGen: 23517742K->24422423K(25165824K)] 26313894K->24422423K(30758272K) [PSPermGen: 66712K->66712K(143424K)], 39.9786840 secs] [Times: user=39.73 sys=0.24, real=39.97 secs] 
[Full GC [PSYoungGen: 2796288K->0K(5592448K)] [PSOldGen: 24422423K->24199714K(25165824K)] 27218711K->24199714K(30758272K) [PSPermGen: 66712K->66478K(137088K)], 55.3075440 secs] [Times: user=55.30 sys=0.01, real=55.30 secs]
...
[Full GC [PSYoungGen: 2796288K->2796288K(5592448K)] [PSOldGen: 25165823K->25165823K(25165824K)] 27962111K->27962111K(30758272K) [PSPermGen: 66010K->66010K(66880K)], 46.6690870 secs] [Times: user=46.67 sys=0.01, real=46.66 secs] 
[Full GC [PSYoungGen: 2796288K->2752435K(5592448K)] [PSOldGen: 25165823K->25165824K(25165824K)] 27962111K->27918259K(30758272K) [PSPermGen: 66010K->65921K(66752K)], 62.8826880 secs] [Times: user=62.90 sys=0.01, real=62.87 secs] 
[Full GC [PSYoungGen: 2796288K->2796288K(5592448K)] [PSOldGen: 25165824K->25165824K(25165824K)] 27962112K->27962112K(30758272K) [PSPermGen: 66012K->66012K(66752K)], 46.6741400 secs] [Times: user=46.71 sys=0.01, real=46.67 secs]
... // GC could barely free any memory, then goes to the OOM error again

I have tried the following approach, but none of them works out:
  1. Double "-Xms" and "-Xmx" from 16GB to 32GB
  2. Set "-XX:NewRatio" to 3 to reduce the GC frequency
  3. Reduce "spark.storage.memoryFraction" from 1 to 0.5
  4. Change the intermediate RDD from "var" to "val"
Could you provide some hints on how to tackle down the new issue? Many Thanks.

Best,
Haokun
+    sc</spa
...

Haokun Luo

unread,
Jul 3, 2013, 9:04:58 PM7/3/13
to spark...@googlegroups.com
I noticed that even though there are plenty of free RAM, but the Block Manager Master could not allocate new blocks, which leads to no heart beat detection on the master node and got killed. For example,

13/07/03 17:52:40 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_46 in memory on haluo-ld1:48183 (size: 85.8 MB, free: 18.8 GB)
13/07/03 17:52:40 INFO storage.BlockManagerMaster: Updated info of block rdd_2_46
13/07/03 17:52:40 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_67 in memory on haluo-ld1:48183 (size: 85.3 MB, free: 18.8 GB)
13/07/03 17:52:40 INFO storage.BlockManagerMaster: Updated info of block rdd_2_67
[CMS-concurrent-mark: 8.216/8.219 secs] [Times: user=33.18 sys=0.02, real=8.22 secs] 
[Full GC [CMS[CMS-concurrent-preclean: 20.008/20.010 secs] [Times: user=30.69 sys=0.01, real=20.01 secs] 
... // Full GC multiple times until timeout
... // Try to allocate another time
13/07/03 17:57:40 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(<driver>, haluo-ld1, 48183) with no recent heart beats
13/07/03 17:57:40 INFO storage.BlockManager: BlockManager reregistering with master
13/07/03 17:57:40 INFO storage.BlockManagerMaster: Trying to register BlockManager
13/07/03 17:57:40 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager haluo-ld1:48183 with 24.7 GB RAM
13/07/03 17:57:40 INFO storage.BlockManagerMaster: Registered BlockManager
13/07/03 17:57:40 INFO storage.BlockManager: Reporting 206 blocks to the master.
[GC [1 CMS-initial-mark: 22369663K(22369664K)] 25028372K(32435968K), 2.0774450 secs] [Times: user=2.08 sys=0.00, real=2.08 secs] 
13/07/03 17:57:42 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_17 in memory on haluo-ld1:48183 (size: 86.2 MB, free: 24.7 GB)

Any thoughts why this occurs?

Best,
Haokun
-    val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold<span style="font-size:13.63636302947998px;background-color:transpa
...

Matei Zaharia

unread,
Jul 4, 2013, 1:33:33 PM7/4/13
to spark...@googlegroups.com
Hi Haokun,

This definitely looks like a GC problem on the workers. It appears that you're using the Concurrent Mark-Sweep GC -- try disabling that and switching back to the normal one. The other issue might be that some of your tasks are using too much memory, decreasing the amount of space available for the cache, etc and causing a lot of GC. Try raising the number of reduce tasks for your shuffle operations (e.g. reduceByKey, groupByKey, etc). Finally, in jobs that create a lot of intermediate RDDs, it can often help to use serialized storage (call RDD.persist(StorageLevel.MEMORY_ONLY_SER) instead of RDD.cache()). Take a look at http://spark-project.org/docs/latest/tuning.html for how to turn it on. This will reduce the duration of full GCs.

Matei

Haokun Luo

unread,
Jul 9, 2013, 7:27:42 PM7/9/13
to spark...@googlegroups.com
Hi Matei,

Great suggestions. I have tried to introduce more reduce tasks by adding reduceByKey operations, and set the intermediate RDD's storage level as StorageLevel.MEMORY_AND_DISK_SER. Based on the Tuning guide on http://www.spark-project.org/docs/0.7.2/tuning.html, I also lower the "spark.storage.memoryFraction" to 0.3, and increase the "spark.default.parallelism" to be 64 (on 12 core machine). 

However, the OOM is still unresolved. So I am giving a try on Kyro serializer. I encountered an "java.io.NotSerializableException". Here is my code:

// in nmf_core.scala
class MyKryoRegistrator extends spark.KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[spark.RDD[(Double, (Array[Double], Array[Double], (Double, Double)))]])
    kryo.register(classOf[spark.RDD[(Double, Array[Double])]])}
}

class nmf_core {
  System.setProperty("spark.serializer", "spark.KryoSerializer")
  System.setProperty("spark.kryo.registrator", "com.linkedin.cup.MyKryoRegistrator")
  System.setProperty("spark.kryoserializer.buffer.mb", "512")
  
  val sc = new SparkContext(...)
}

The error is:
13/07/09 15:40:39 ERROR local.LocalScheduler: Exception in task 14
java.io.NotSerializableException: com.linkedin.cup.nmf_core
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:11)
at spark.scheduler.ResultTask$.serializeInfo(ResultTask.scala:26)
at spark.scheduler.ResultTask.writeExternal(ResultTask.scala:91)
at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:11)
at spark.JavaSerializerInstance.serialize(JavaSerializer.scala:31)
at spark.scheduler.Task$.serializeWithDependencies(Task.scala:61)
at spark.scheduler.local.LocalScheduler.runTask$1(LocalScheduler.scala:66)
at spark.scheduler.local.LocalScheduler$$anon$1.run(LocalScheduler.scala:49)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Since I register spark.RDD class for serialization, but how come Kyro tries to serialize nmf_core class?

Best,
Haokun
+    var jobResult = Utils.clone(zeroValue<span style="fon
...

Haokun Luo

unread,
Jul 9, 2013, 9:22:39 PM7/9/13
to spark...@googlegroups.com
No worry on the serialization error, it was my fault on constructing that class. I will keep posting on the GC tuning results using Kryo. Thanks for all the helps.

Best,
Haokun
...

Haokun Luo

unread,
Jul 10, 2013, 2:42:55 PM7/10/13
to spark...@googlegroups.com
Hi Matei,

Right now it tooks the cacheManager 7 mins to find a partition in memory using Kryo 2.21, compared with 4 - 5 s without Kryo. For instance, during the "reduceByKey" operation:

13/07/10 11:14:36 INFO spark.SparkContext: Starting job: reduce at nmf.scala:236
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Registering RDD 25 (reduceByKey at nmf.scala:236)
13/07/10 11:14:36 INFO rdd.CoGroupedRDD: Adding shuffle dependency with FlatMappedRDD[17] at flatMap at nmf.scala:191
13/07/10 11:14:36 INFO rdd.CoGroupedRDD: Adding shuffle dependency with MappedRDD[12] at map at nmf.scala:177
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Registering RDD 29 (apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Registering RDD 18 (apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Registering RDD 19 (apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Registering RDD 30 (apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Got job 2 (reduce at nmf.scala:236) with 1 output partitions (allowLocal=false)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Final stage: Stage 2 (map at nmf.scala:236)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 3)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Missing parents: List(Stage 3)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Submitting Stage 5 (MapPartitionsRDD[18] at apply at TraversableLike.scala:233), which has no missing parents
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Submitting 72 missing tasks from Stage 5 (MapPartitionsRDD[18] at apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO local.LocalScheduler: Running ShuffleMapTask(5, 0)
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Submitting Stage 6 (MapPartitionsRDD[19] at apply at TraversableLike.scala:233), which has no missing parents
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Submitting 64 missing tasks from Stage 6 (MapPartitionsRDD[19] at apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO local.LocalScheduler: Size of task 0 is 2387 bytes
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Submitting Stage 7 (MapPartitionsRDD[30] at apply at TraversableLike.scala:233), which has no missing parents
13/07/10 11:14:36 INFO scheduler.DAGScheduler: Submitting 64 missing tasks from Stage 7 (MapPartitionsRDD[30] at apply at TraversableLike.scala:233)
13/07/10 11:14:36 INFO spark.CacheManager: Cache key is rdd_2_0
13/07/10 11:14:36 INFO spark.CacheManager: Found partition in cache!
13/07/10 11:14:39 INFO local.LocalScheduler: Finished ShuffleMapTask(5, 0)
13/07/10 11:14:39 INFO local.LocalScheduler: Running ShuffleMapTask(5, 1)
13/07/10 11:14:39 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 0)
13/07/10 11:14:39 INFO local.LocalScheduler: Size of task 1 is 2387 bytes
13/07/10 11:14:39 INFO spark.CacheManager: Cache key is rdd_2_1
13/07/10 11:14:39 INFO spark.CacheManager: Found partition in cache!
13/07/10 11:21:18 INFO local.LocalScheduler: Finished ShuffleMapTask(5, 1)
13/07/10 11:21:18 INFO local.LocalScheduler: Running ShuffleMapTask(5, 2)
13/07/10 11:21:18 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 1)
13/07/10 11:21:18 INFO local.LocalScheduler: Size of task 2 is 2387 bytes
13/07/10 11:21:18 INFO spark.CacheManager: Cache key is rdd_2_2
13/07/10 11:21:18 INFO spark.CacheManager: Found partition in cache!
1319.042: [GC [PSYoungGen: 10583994K->185140K(10811008K)] 15318347K->5054823K(33180672K), 0.0795170 secs] [Times: user=0.76 sys=0.02, real=0.08 secs] 
13/07/10 11:34:37 INFO local.LocalScheduler: Finished ShuffleMapTask(5, 2)
13/07/10 11:34:37 INFO local.LocalScheduler: Running ShuffleMapTask(5, 3)
13/07/10 11:34:37 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 2)
13/07/10 11:34:37 INFO local.LocalScheduler: Size of task 3 is 2387 bytes
13/07/10 11:34:37 INFO spark.CacheManager: Cache key is rdd_2_3
13/07/10 11:34:37 INFO spark.CacheManager: Found partition in cache!

The current configurations are 
  1. -Xms32g -Xmx32g
  2. System.setProperty("spark.cores.max", "64")
  3. System.setProperty("spark.default.parallelism", "128")
  4. System.setProperty("spark.akka.timeout", "60")
  5. System.setProperty("spark.storage.memoryFraction", "0.3")
  6. System.setProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000")
  7. System.setProperty("spark.kryoserializer.buffer.mb", "4096")
  8. StorageLevel.MEMORY_ONLY_SER for all RDDs
  9. Kryo registered classes:
    kryo.register(classOf[spark.RDD[(Double, (Array[Double], Array[Double], (Double, Double)))]])
    kryo.register(classOf[spark.RDD[(Double, Array[Double])]])
    kryo.register(classOf[spark.RDD[(Double, Double)]])
    kryo.register(classOf[spark.RDD[Double]])
    kryo.register(classOf[Array[Double]])
    kryo.register(classOf[Double])

Really appreciate if you could help me point out where the slowness come from. Thanks.

Best,
Haokun
Reply all
Reply to author
Forward
0 new messages