Hi,
I am trying to run a spark code on a single machine:
Below is the portion of my Spark code:
def main(args: Array[String]) {
// Creating the Spark RDD using SparkContext
val sc = new SparkContext(args(0), "BreadthFirstSearch")
var iterativeInput = sc.textFile("/usr/local/gaurav_working_directory/DijkstraAlgorithm/input.txt")
// Calculating the iterations to run
var iterationcount = iterativeInput.count()
val iterationCount = iterationcount.toInt
// Runs for the iteration number
for (i <- 1 to iterationCount) {
var mapResult = iterativeInput.flatMap(x => BreadthFirstMap(x))
var groupByResult = mapResult.groupByKey(1)
var reduceResult = groupByResult.mapValues(x => BreadthFirstReduce(x)).sortByKey(true)
iterativeInput = reduceResult.map(x => x._1 + "\t" + x._2)
}
// Saving the final output to disk
iterativeInput.saveAsTextFile("/usr/local/gaurav_working_directory/DijkstraAlgorithm/result")
System.exit(0)
}
When I am trying to execute it, I am getting the following error:
12/10/16 08:13:11 INFO spark.SimpleJob: Lost TID 4 (task 2:0)
12/10/16 08:13:11 INFO spark.SimpleJob: Loss was due to java.lang.NullPointerException
at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:881)
at spark.MapOutputTracker.getServerUris(MapOutputTracker.scala:114)
at spark.SimpleShuffleFetcher.fetch(SimpleShuffleFetcher.scala:16)
at spark.ShuffledRDD.compute(ShuffledRDD.scala:39)
at spark.RDD.iterator(RDD.scala:78)
at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:413)
at spark.RDD.iterator(RDD.scala:78)
at spark.ShuffleMapTask.run(ShuffleMapTask.scala:27)
at spark.ShuffleMapTask.run(ShuffleMapTask.scala:10)
at spark.Executor$TaskRunner.run(Executor.scala:82)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
12/10/16 08:13:12 INFO spark.SimpleJob: Starting task 2:0 as TID 5 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:12 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 7 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:12 INFO spark.SimpleJob: Lost TID 5 (task 2:0)
12/10/16 08:13:13 INFO spark.SimpleJob: Starting task 2:0 as TID 6 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:13 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 7 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:14 INFO spark.SimpleJob: Lost TID 6 (task 2:0)
12/10/16 08:13:14 INFO spark.SimpleJob: Loss was due to java.lang.NullPointerException [duplicate 1]
12/10/16 08:13:14 INFO spark.SimpleJob: Starting task 2:0 as TID 7 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:14 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 7 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:14 INFO spark.SimpleJob: Lost TID 7 (task 2:0)
12/10/16 08:13:15 INFO spark.SimpleJob: Starting task 2:0 as TID 8 on slave 201209240510-331917504-5050-10003-0: babar9.musigma.com (preferred)
12/10/16 08:13:15 INFO spark.SimpleJob: Size of task 2:0 is 11149 bytes and took 6 ms to serialize by spark.JavaSerializerInstance
12/10/16 08:13:16 INFO spark.SimpleJob: Lost TID 8 (task 2:0)
12/10/16 08:13:16 INFO spark.SimpleJob: Loss was due to java.lang.NullPointerException [duplicate 2]
12/10/16 08:13:16 ERROR spark.SimpleJob: Task 2:0 failed more than 4 times; aborting job
spark.SparkException: Task failed: ShuffleMapTask(10, 0), reason: ExceptionFailure(java.lang.NullPointerException)
at spark.DAGScheduler$class.runJob(DAGScheduler.scala:313)
at spark.MesosScheduler.runJob(MesosScheduler.scala:26)
at spark.SparkContext.runJob(SparkContext.scala:316)
at spark.SparkContext.runJob(SparkContext.scala:327)
at spark.SparkContext.runJob(SparkContext.scala:338)
at spark.RDD.foreach(RDD.scala:157)
at BreadthFirstSearch$.main(BreadthFirstSearch.scala:126)
at BreadthFirstSearch.main(BreadthFirstSearch.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at scala.tools.nsc.util.ScalaClassLoader$$anonfun$run$1.apply(ScalaClassLoader.scala:78)
at scala.tools.nsc.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:24)
at scala.tools.nsc.util.ScalaClassLoader$URLClassLoader.asContext(ScalaClassLoader.scala:88)
at scala.tools.nsc.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:78)
at scala.tools.nsc.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:101)
at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:33)
at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:40)
at scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:56)
at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:80)
at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:89)
at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)
If I comment out the following line from my code:
// iterativeInput.saveAsTextFile("/usr/local/gaurav_working_directory/DijkstraAlgorithm/result")
Then everything runs fine. So, atleast in order to get my final output, I used the following line instead:
iterativeInput.foreach(println)
Doing so, again gives the same error.
Also, I can save or print the RDDs (mapResult and groupByResult) which is not giving me any error. But (reduceResult and iterativeInput) is creating this problem.
I can see that many people have faced this issue, but could not find a proper solution. Can someone explain me what exactly is the problem and solution, if known?
Thanks,
Gaurav