Debugging problem with mapPartitions

1,341 views
Skip to first unread message

Zhi Dou

unread,
Jun 6, 2012, 4:19:14 AM6/6/12
to spark...@googlegroups.com
Dear all,

I am using Spark to do a data mining job now. I come across a problem in using functions mapPartitions. 

Here is the main class in my program

        def main(args: Array[String]) {

                if (args.length == 0) {
                        System.err.println("Usage: SparkTest <host> [<slices>]")
                        System.exit(1)
                }

                val spark = new SparkContext(args(0), "Build Cubes")
                val slices = if (args.length > 1) args(1).toInt else 2

                // Creates a RDD for tuples.
                val tuples = spark.textFile("tuples.txt")

                // Reads candidates on the master node, and use broadcast to send it to all slaves.
                val candidates = scala.io.Source.fromFile("candidates.txt").getLines.toSeq
                val candidatesBroadcast = spark.broadcast(candidates)

                val tuplesArray = tuples.toArray()

                val localJumpResults = tuples.mapPartitions { partitionIterator =>
                        // set up
                        var jump = "* p1 *"
                        for (tuple <- partitionIterator) {
                                candidatesBroadcast.value.map { candidate =>
                                        jump = filterAndJump(candidate, tuple, jump)
                                }
                        }
                        Iterator(jump)
                }

                localJumpResults.saveAsTextFile("localJumpResults")
        }

What I am doing in the mapPartitions is iterate over all tuples and for each tuple I want to update the jump information for each candidate. So it is a nested loop. 

The problem is running results show that the inner loop (marked as yellow) does not work, which means there is no iteration on candidates.

Would you guys kindly tell me what is wrong?
Thank you very much in advance!

Bests,
Harry

--
Zhi DOU, Harry
MPhil '13
Department of Computer Science
University of Hong Kong

Matei Zaharia

unread,
Jun 6, 2012, 1:17:42 PM6/6/12
to spark...@googlegroups.com
Have you tried printing out candidatesBroadcast.value and tuple on the workers? Just do a println and look for them in the stdout file of the worker node.

Matei

Zhi Dou

unread,
Jun 7, 2012, 2:26:36 AM6/7/12
to spark...@googlegroups.com
Hi Matei, thank you very much!

Yes, I have tried that, and the results show that it only iterate over all tuples but only the first candidate is used. However, you know, what I want is in tuple1, I do filterAndJump with (tuple1, candidate1), (tuple1, candidate2), (tuple1, candidate3) ... and for tuple2, I do filterAndJump with (tuple2, candidate1), (tuple2, candidate2), (tuple2, candidate3) ...the same for other tuples.

I tried to use mapPartitions on candidates, excatly the same as tuples, shown as below.

                val tuples = spark.textFile("tuples.txt")
                val candidates = spark.textFile("candidates.txt")

                val localJumpResults = tuples.mapPartitions { partitionIterator =>
                        var jump = "* p1 *"
                        for (tuple <- partitionIterator) {
                                println("tuple: " + tuple)
                                candidates.mapPartitions { canIterator =>
                                        for (candidate <- canIterator) {
                                                println("can: "+candidate)
                                                jump = filterAndJump(candidate, tuple, jump)
                                        }
                                        Iterator(canIterator)
                                }
                        }
                        Iterator(partitionIterator)
                }

The problem is I got a NullPointer after accessing first tuple, tuple1:
12/06/07 14:27:56 INFO spark.LocalScheduler: Running task 0
12/06/07 14:27:56 INFO spark.LocalScheduler: Size of task 0 is 19897 bytes
tuple: b1 p1 c1 10
12/06/07 14:27:57 ERROR spark.LocalScheduler: Exception in task 0
java.lang.NullPointerException
        at spark.RDD.mapPartitions(RDD.scala:150)
        at Main$$anon$1$BuildCubes$$anonfun$1$$anonfun$apply$1.apply(Build.scala:128)
        at Main$$anon$1$BuildCubes$$anonfun$1$$anon

So, I do not know how to implement this nested loop, could you please tell me what is wrong?

Thank you.

Regards,
Harry
Reply all
Reply to author
Forward
0 new messages