I am calling the cache() before starting the iteration like this:
var iterativeInput = sc.textFile("hdfs://br9/user/root/mstInput/5000node.txt", 88)
iterativeInput = iterativeInput.cache()
When I execute the job, it gives me the following exception for all serialized taks:
12/12/20 01:59:02 ERROR storage.BlockManagerMasterActor: key not found: BlockManagerId(
babar8.musigma.com, 37938)
java.util.NoSuchElementException: key not found: BlockManagerId(
babar8.musigma.com, 37938)
at scala.collection.MapLike$class.default(MapLike.scala:225)
at scala.collection.mutable.HashMap.default(HashMap.scala:45)
at scala.collection.MapLike$class.apply(MapLike.scala:135)
at scala.collection.mutable.HashMap.apply(HashMap.scala:45)
at spark.storage.BlockManagerMasterActor.spark$storage$BlockManagerMasterActor$$heartBeat(BlockManagerMaster.scala:244)
at spark.storage.BlockManagerMasterActor$$anonfun$receive$1.apply(BlockManagerMaster.scala:189)
at spark.storage.BlockManagerMasterActor$$anonfun$receive$1.apply(BlockManagerMaster.scala:184)
at akka.actor.Actor$class.apply(Actor.scala:318)
at spark.storage.BlockManagerMasterActor.apply(BlockManagerMaster.scala:91)
at akka.actor.ActorCell.invoke(ActorCell.scala:626)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
at akka.dispatch.Mailbox.run(Mailbox.scala:179)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
What should I do exactly?
And also, I have broken the sequence of RDD operations into parts:
info("Calculating total edges formed..................................................................................")
var totalEdgeFilter = iterativeInput.filter(_.contains("mstEdge"))
info("Total edge filter done..........................................................................................")
var totalEdgeFlatMap = totalEdgeFilter.flatMap(x => x.split(" ")(1).split("_"))
info("Total edge flat map done........................................................................................")
totalEdge = totalEdgeFlatMap.count().toInt
info("Calculated total edges formed...................................................................................")
Still the case is same, i.e., count() is taking huge time. Now, I believe when the "count()" is invoked, all the earlier operations are completed. Am I right?
Thanks,
Gaurav