I am using Spark to do a data mining job now. I come across a problem in using functions mapPartitions.
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: SparkTest <host> [<slices>]")
System.exit(1)
}
val spark = new SparkContext(args(0), "Build Cubes")
val slices = if (args.length > 1) args(1).toInt else 2
// Creates a RDD for tuples.
val tuples = spark.textFile("tuples.txt")
// Reads candidates on the master node, and use broadcast to send it to all slaves.
val candidates = scala.io.Source.fromFile("candidates.txt").getLines.toSeq
val candidatesBroadcast = spark.broadcast(candidates)
val tuplesArray = tuples.toArray()
val localJumpResults = tuples.mapPartitions { partitionIterator =>
// set up
var jump = "* p1 *"
for (tuple <- partitionIterator) {
candidatesBroadcast.value.map { candidate =>
jump = filterAndJump(candidate, tuple, jump)
}
}
Iterator(jump)
}
localJumpResults.saveAsTextFile("localJumpResults")
}
The problem is running results show that the inner loop (marked as yellow) does not work, which means there is no iteration on candidates.