reduce function in RDD.scala:--- spark-0.6.3/core/src/main/scala/spark/RDD.scala 2013-06-27 11:31:12.628017194 -0700
+++ spark-0.7.2/core/src/main/scala/spark/RDD.scala 2013-06-27 13:42:22.844686240 -0700
@@ -316,39 +468,93 @@
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
+ // println("RDD.reduce: after sc.clean")
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
- }else {
+ } else {
None
}
}
- val options = sc.runJob(this, reducePartition)
- val results = new ArrayBuffer[T]
- for (opt <- options; elem <- opt) {
- results += elem
- }
- if (results.size == 0) {
- throw new UnsupportedOperationException("empty collection")
- } else {
- return results.reduceLeft(cleanF)
+ // println("RDD.reduce: after reducePartition")
+ var jobResult: Option[T] = None
+ val mergeResult = (index: Int, taskResult: Option[T]) => {
+ if (taskResult != None) {
+ jobResult = jobResult match {
+ case Some(value) => Some(f(value, taskResult.get))
+ case None => taskResult
+ }
+ }
}
+ // println("RDD.reduce: after jobResult")
+ sc.runJob(this, reducePartition, mergeResult)
+ // println("RDD.reduce: after sc.runJob")
+ // Get the final result out of our Option, or throw an exception if the RDD was empty
+ jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+ // println("RDD.reduce: finished")
}
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+ * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
def fold(zeroValue: T)(op: (T, T) => T): T = {
+ // Clone the zero value since we will also be serializing it as part of tasks
+ var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
- val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
- return results.fold(zeroValue)(cleanOp)
+ val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+ val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+ sc.runJob(this, foldPartition, mergeResult)
+ jobResult
}--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
while (i <= ITERATION && err >= THRESHOLD) {
// AW: group by row, then create key by col
// split A by row
// (col, (A_w_M_element, W_row_vector, (row, col)))
AW = A.map(x =>
(x._1._1, (x._1, x._2))
).cogroup(W).flatMap( x => {
val wt_i = x._2._2(0)
val A_i_by_j = x._2._1
A_i_by_j.map( j => (j._1._2, (j._2, wt_i, j._1)))
})
// calculate the X = Wt*A
X_i_by_j = AW.map( k =>
(k._1, k._2._2.map(_*k._2._1(0)))
).reduceByKey(op_two_arrays(_, _, add))
// Y = Wt*M = Wt*WH at the same time
Y_i_by_j = AW.map( k =>
(k._1, k._2._2.map(_*k._2._1(2)))
).reduceByKey(op_two_arrays(_, _, add))
// X ./ Y
X_divide_Y = X_i_by_j.join(Y_i_by_j).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, divide))
)
// H = H .* X_divide_Y
H = H.join(X_divide_Y).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, multiple))
)
// Update M = WH
// M = matrix_multi_local(AW, H)
A = AW.join(H).map( x => {
val orig_AwM = x._2._1._1
val W_row = x._2._1._2
val cord = x._2._1._3
val H_col = x._2._2
// notice that we include original A here as well
(cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
})
// split M into two intermediate matrix (one by row, and the other by col)
/*val M_by_i = M.map(x =>
(x._1._1, (x._1, x._2))
)
val M_by_j = M.map(x =>
(x._1._2, (x._1, x._2))
)*/
// AH: group by col, then create key by row
// Divide A by row first
// val AH = matrix_join_local(M_by_j, H)
AH = A.map(x =>
(x._1._2, (x._1, x._2))
).cogroup(H).flatMap( x => {
val H_col = x._2._2(0)
val AM_j_by_i = x._2._1
AM_j_by_i.map( i => (i._1._1, (i._2, H_col, i._1)))
})
// calculate V = At*H
V_j_by_i = AH.map( k =>
(k._1, k._2._2.map(_*k._2._1(0)))
).reduceByKey(op_two_arrays(_, _, add))
// calculate U = Mt*H
U_j_by_i = AH.map( k =>
(k._1, k._2._2.map(_*k._2._1(2)))
).reduceByKey(op_two_arrays(_, _, add))
// V / U
V_divide_U = V_j_by_i.join(U_j_by_i).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, divide))
)
// W = W .* V_divide_U
W = W.join(V_divide_U).map(x =>
(x._1, op_two_arrays(x._2._1, x._2._2, multiple))
)
// M = W*H
A = AH.join(W).map( x => {
val orig_AwM = x._2._1._1
val H_col = x._2._1._2
val cord = x._2._1._3
val W_row = x._2._2
// notice that we include original A here as well
(cord, Array(orig_AwM(0), orig_AwM(1), dot_product_local(W_row, H_col)))
})
// Calculate the error
// calculate the sequre of difference
err = A.map( x => (x._2(0) - x._2(2))*(x._2(0) - x._2(2))/A_len).reduce(_+_)
println("At round " + i + ": MSE is " + err)
}Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
...
...+ sc</spa
...- val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold<span style="font-size:13.63636302947998px;background-color:transpa
...+ var jobResult = Utils.clone(zeroValue<span style="fon
...