Here is the code in question. Note: I'm using count to force the
evaluation of a single iteration. I need to measure per-iteration
times... is there a better way to do this?
for (i <- 0 until iters) {
timestamp = System.currentTimeMillis
val contribs = links.groupWith(ranks).flatMap {
case (id, (Seq(links), Seq(rank))) =>
links.map(dest => (dest, rank / links.length))
case (id, (Seq(links), Seq())) =>
links.map(dest => (dest, 0.15 / n / links.length))
case (id, (Seq(), Seq(rank))) => // Vertex had no links list
new Array[(Long, Double)](0)
}
def sum(x: Double, y: Double) = x + y
ranks = contribs.combineByKey(identity, sum, sum, parallelism)
.mapValues(sum => 0.15 / n + 0.85 * sum)
val totalRanks = ranks.count
println("Number of ranks: " + totalRanks)
println("Iteration time: " + ((System.currentTimeMillis -
timestamp) / 1000.0) + " seconds.")
/*
// Non-combiner version
ranks = contribs.groupByKey(vp).mapValues(rs => 0.15/n +
0.85*rs.sum)
*/