Problem with RDD.saveAsTextFile()

1,303 views
Skip to first unread message

Gaurav Dasgupta

unread,
Oct 16, 2012, 9:32:47 AM10/16/12
to spark...@googlegroups.com
Hi,
 
I am trying to run a spark code on a single machine:
Below is the portion of my Spark code:
 
def main(args: Array[String]) {
// Creating the Spark RDD using SparkContext
    val sc = new SparkContext(args(0), "BreadthFirstSearch")
    var iterativeInput = sc.textFile("/usr/local/gaurav_working_directory/DijkstraAlgorithm/input.txt")
// Calculating the iterations to run
    var iterationcount = iterativeInput.count()
    val iterationCount = iterationcount.toInt
// Runs for the iteration number
    for (i <- 1 to iterationCount) {
      var mapResult = iterativeInput.flatMap(x => BreadthFirstMap(x))
      var groupByResult = mapResult.groupByKey(1)
      var reduceResult = groupByResult.mapValues(x => BreadthFirstReduce(x)).sortByKey(true)
      iterativeInput = reduceResult.map(x => x._1 + "\t" + x._2)
    }
// Saving the final output to disk
    iterativeInput.saveAsTextFile("/usr/local/gaurav_working_directory/DijkstraAlgorithm/result")
    System.exit(0)
  }
When I am trying to execute it, I am getting the following error:
 
12/10/16 08:13:11 INFO spark.SimpleJob: Lost TID 4 (task 2:0)
12/10/16 08:13:11 INFO spark.SimpleJob: Loss was due to java.lang.NullPointerException
 at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:881)
 at spark.MapOutputTracker.getServerUris(MapOutputTracker.scala:114)
 at spark.SimpleShuffleFetcher.fetch(SimpleShuffleFetcher.scala:16)
 at spark.ShuffledRDD.compute(ShuffledRDD.scala:39)
 at spark.RDD.iterator(RDD.scala:78)
 at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:413)
 at spark.RDD.iterator(RDD.scala:78)
 at spark.ShuffleMapTask.run(ShuffleMapTask.scala:27)
 at spark.ShuffleMapTask.run(ShuffleMapTask.scala:10)
 at spark.Executor$TaskRunner.run(Executor.scala:82)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)
12/10/16 08:13:12 INFO spark.SimpleJob: Starting task 2:0 as TID 5 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:12 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 7 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:12 INFO spark.SimpleJob: Lost TID 5 (task 2:0)
12/10/16 08:13:13 INFO spark.SimpleJob: Starting task 2:0 as TID 6 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:13 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 7 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:14 INFO spark.SimpleJob: Lost TID 6 (task 2:0)
12/10/16 08:13:14 INFO spark.SimpleJob: Loss was due to java.lang.NullPointerException [duplicate 1]
12/10/16 08:13:14 INFO spark.SimpleJob: Starting task 2:0 as TID 7 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:14 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 7 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:14 INFO spark.SimpleJob: Lost TID 7 (task 2:0)
12/10/16 08:13:15 INFO spark.SimpleJob: Starting task 2:0 as TID 8 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:15 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 6 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:16 INFO spark.SimpleJob: Lost TID 8 (task 2:0)
12/10/16 08:13:16 INFO spark.SimpleJob: Loss was due to java.lang.NullPointerException [duplicate 2]
12/10/16 08:13:16 ERROR spark.SimpleJob: Task 2:0 failed more than 4 times; aborting job
spark.SparkException: Task failed: ShuffleMapTask(10, 0), reason: ExceptionFailure(java.lang.NullPointerException)
 at spark.DAGScheduler$class.runJob(DAGScheduler.scala:313)
 at spark.MesosScheduler.runJob(MesosScheduler.scala:26)
 at spark.SparkContext.runJob(SparkContext.scala:316)
 at spark.SparkContext.runJob(SparkContext.scala:327)
 at spark.SparkContext.runJob(SparkContext.scala:338)
 at spark.RDD.foreach(RDD.scala:157)
 at BreadthFirstSearch$.main(BreadthFirstSearch.scala:126)
 at BreadthFirstSearch.main(BreadthFirstSearch.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at scala.tools.nsc.util.ScalaClassLoader$$anonfun$run$1.apply(ScalaClassLoader.scala:78)
 at scala.tools.nsc.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:24)
 at scala.tools.nsc.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:88)
 at scala.tools.nsc.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:78)
 at scala.tools.nsc.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:101)
 at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:33)
 at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:40)
 at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:56)
 at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:80)
 at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:89)
 at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

 

If I comment out the following line from my code:
// iterativeInput.saveAsTextFile("/usr/local/gaurav_working_directory/DijkstraAlgorithm/result")
Then everything runs fine. So, atleast in order to get my final output, I used the following line instead:
iterativeInput.foreach(println)
Doing so, again gives the same error.
 
Also, I can save or print the RDDs (mapResult and groupByResult) which is not giving me any error. But (reduceResult and iterativeInput) is creating this problem.
 
I can see that many people have faced this issue, but could not find a proper solution. Can someone explain me what exactly is the problem and solution, if known?
 
Thanks,
Gaurav

Josh Rosen

unread,
Oct 16, 2012, 1:26:05 PM10/16/12
to spark...@googlegroups.com
From the line numbers and filenames in the tracebacks, it looks like this from the 0.5.1 release.

I managed to produce a similar traceback while writing a custom RDD implementation; the problem occurred when manually fetching shuffle partitions with invalid shuffleIds.  At a fresh spark-shell prompt running the 0.5.1 release:

scala> import spark.SimpleShuffleFetcher
import spark.SimpleShuffleFetcher

scala> val fetcher = new SimpleShuffleFetcher
fetcher: spark.SimpleShuffleFetcher = spark.SimpleShuffleFetcher@4ac389c4

scala> fetcher.fetch(-1, 0, (k: Any, v: Any) => {})
12/10/16 10:04:16 INFO spark.SimpleShuffleFetcher: Fetching outputs for shuffle -1, reduce 0
12/10/16 10:04:16 INFO spark.MapOutputTracker: Don't have map outputs for -1, fetching them
12/10/16 10:04:16 INFO spark.MapOutputTracker: Doing the fetch; tracker actor = spark.MapOutputTrackerActor@329e2e50
12/10/16 10:04:16 INFO spark.MapOutputTrackerActor: Asked to get map output locations for shuffle -1
java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:881)
at spark.MapOutputTracker.getServerUris(MapOutputTracker.scala:114)
at spark.SimpleShuffleFetcher.fetch(SimpleShuffleFetcher.scala:16)
[…]

Here, the shuffleId is obviously invalid, but this will also raise the same exception for any shuffleId that hasn't been used yet (so 0 would also trigger the same exception).

If we try to fetch a partition using a valid shuffleId and invalid reduceId, then we see a different error; in this case, I try to fetch from an RDD with two shuffled partitions:

scala> val computedShuffleDep = sc.parallelize(1 to 10).map(x => (x, x)).groupByKey(2).collect()
[…]
12/10/16 10:04:24 INFO spark.SparkContext: Job finished in 0.175349 s
computedShuffleDep: Array[(Int, Seq[Int])] = Array((2,ArrayBuffer(2)), (4,ArrayBuffer(4)), (6,ArrayBuffer(6)) […]

scala> fetcher.fetch(0, 0, (k: Any, v: Any) => {})
12/10/16 09:57:52 INFO spark.SimpleShuffleFetcher: Fetching outputs for shuffle 0, reduce 0
12/10/16 09:57:52 INFO spark.MapOutputTracker: Don't have map outputs for 0, fetching them
12/10/16 09:57:52 INFO spark.MapOutputTracker: Doing the fetch; tracker actor = spark.MapOutputTrackerActor@389235c2
12/10/16 09:57:52 INFO spark.MapOutputTrackerActor: Asked to get map output locations for shuffle 0
java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:881)
at spark.MapOutputTracker.getServerUris(MapOutputTracker.scala:114)
at spark.SimpleShuffleFetcher.fetch(SimpleShuffleFetcher.scala:16)

scala> fetcher.fetch(0, 0, (k: Any, v: Any) => {})
12/10/16 10:04:33 INFO spark.SimpleShuffleFetcher: Fetching outputs for shuffle 0, reduce 0
12/10/16 10:04:33 INFO spark.SimpleShuffleFetcher: Fetched all 5 records successfully

scala> fetcher.fetch(0, 1, (k: Any, v: Any) => {})
12/10/16 10:04:38 INFO spark.SimpleShuffleFetcher: Fetching outputs for shuffle 0, reduce 1
12/10/16 10:04:38 INFO spark.SimpleShuffleFetcher: Fetched all 5 records successfully

scala> fetcher.fetch(0, 2, (k: Any, v: Any) => {})
12/10/16 10:04:43 INFO spark.SimpleShuffleFetcher: Fetching outputs for shuffle 0, reduce 2
12/10/16 10:04:43 ERROR spark.SimpleShuffleFetcher: Fetch failed
java.io.FileNotFoundException: http://192.168.1.110:51778/shuffle/0/0/2
[…]
spark.FetchFailedException: Fetch failed: http://192.168.1.110:51778 0 0 2
[…]
Caused by: java.io.FileNotFoundException: http://192.168.1.110:51778/shuffle/0/0/2

The 0.6.0 release behaves similarly when trying to fetch from uncomputed or invalid shuffle stages.

This doesn't explain the problem in the breadth first search job, but it might be a useful clue.

- Josh

Gaurav Dasgupta

unread,
Oct 16, 2012, 4:05:44 PM10/16/12
to spark...@googlegroups.com
Hi Josh,

Thanks for the reply.
I don't think there is anything non computed from where I am trying to fetch. The loop completes the problem and my reduce function returns just a String, which mapValues() maps with the Keys.

This code is successfully tested for the same input data set in a different machine having Spark 0.5.1 release only. I have checked the configs and settings in this machine and everything seems fine.
Its weird that it is throwing this exception in one machine and not in another when spark is installed and configured in both the machines in same way. I have seen this problem being faced by others also. Shall I consider this as a bug in the spark 0.5.1 release?

Is there any other way that I can try printing my final output or store it in local disk or HDFS?

Thanks,
Gaurav 

Matei Zaharia

unread,
Oct 16, 2012, 6:29:27 PM10/16/12
to spark...@googlegroups.com
What's the first error on your master? Sometimes this error happens because of another one earlier. My guess is that one of your machines failed to launch the Spark worker process, or had it crash.

Matei
Reply all
Reply to author
Forward
0 new messages