RDD transformation taking too much time

1,070 views
Skip to first unread message

Gaurav Dasgupta

unread,
Dec 20, 2012, 1:19:53 AM12/20/12
to spark...@googlegroups.com
var iterativeInput = sc.textFile("hdfs://br9/user/root/mstInput/5000node.txt", 88)
while (totalEdge < (totalNode)) {
  var mapResult = iterativeInput.flatMap(x => PrimsMap(x)).groupByKey(88)
  info("Reduce Starts...................................................................................................")
  var reduceResult = mapResult.mapValues(x => PrimsReduce(x))
  info("Mapping the RDD for next iteration..............................................................................")
  iterativeInput = reduceResult.map(x => x._1 + " " + x._2)
  info("Calculating total edges formed..................................................................................")
  totalEdge = iterativeInput.filter(_.contains("mstEdge")).flatMap(x => x.split(" ")(1).split("_")).count().toInt
  info("Total edges calculated..................................................................................")
}
val MSTEdge = iterativeInput.filter(_.contains("mstEdge")).map(x => x.split(" ")(1))
val MSTWeight = iterativeInput.filter(_.contains("mstWeight")).map(x => x.split(" ")(1))
val output = MSTEdge.union(MSTWeight)
info("Final output filtered.............................................................................................")
output.saveAsTextFile("hdfs://br9/user/root/5000node88")
 
Above is the portion of my Spark code. I have implemented loggers to check which phase is taking how much time. I can see that when I calculating the "totalEdge" where I am using "filter" and "flatMap" functions and "saveAsTextFile", are taking too much time. What can be the possible reason for that? My RDD is something like the following (This is just a sample. For bigger datasets, it will be huge):
 
3 1:12.0,2:7.0,
2 3:7.0,4:8.0
1 3:12.0,4:5.0
-1 mstVertex_1,2_3,4
-1 mstEdge_4-3_3-4
-1 mstWeight_6.0
4 1:5.0,2:8.0,
 
For a dataset of 450MB, its almost stuck at these satges.
 
Also, the spark loggers give this:
12/12/19 23:50:07 INFO spark.PairRDDFunctions: Saving as hadoop file of type (NullWritable, Text)
12/12/19 23:50:07 INFO spark.SparkContext: Starting job: saveAsTextFile at Prims.scala:204
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering RDD 15 (saveAsTextFile at Prims.scala:204)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 15 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 15 with 176 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 15 (saveAsTextFile at Prims.scala:204)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 14 (union at Prims.scala:202)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 14 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 14 with 176 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 11 (map at Prims.scala:200)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 11 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 11 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 10 (filter at Prims.scala:200)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 10 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 10 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 9 (flatMap at Prims.scala:191)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 9 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 9 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 8 (mapValues at Prims.scala:188)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 8 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 8 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 7 (groupByKey at Prims.scala:185)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 7 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 7 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering RDD 6 (flatMap at Prims.scala:185)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 6 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 6 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 6 (flatMap at Prims.scala:185)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 5 (flatMap at Prims.scala:191)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 5 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 5 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 4 (mapValues at Prims.scala:188)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 4 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 4 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 3 (groupByKey at Prims.scala:185)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 3 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 3 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering RDD 2 (flatMap at Prims.scala:185)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 2 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 2 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 2 (flatMap at Prims.scala:185)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 1 (textFile at Prims.scala:175)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 0 (textFile at Prims.scala:175)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 13 (map at Prims.scala:201)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 13 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 13 with 88 partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Registering parent RDD 12 (filter at Prims.scala:201)
12/12/19 23:50:07 INFO spark.CacheTracker: Registering RDD ID 12 with cache
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Registering RDD 12 with 88 partitions
12/12/19 23:50:07 INFO spark.CacheTrackerActor: Asked for current cache locations
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Got job 1 (saveAsTextFile at Prims.scala:204) with 176 output partitions
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Final stage: Stage 1 (saveAsTextFile at Prims.scala:204)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 2)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 2)
12/12/19 23:50:07 INFO scheduler.DAGScheduler: Submitting Stage 3 (flatMap at Prims.scala:185), which has no missing parents
 

I can see that when the job "saveAsTextFile" starts, all the previous parent RDDs are again registered. Why so?
 
Thanks,
Gaurav  
 

Gaurav Dasgupta

unread,
Dec 20, 2012, 1:58:01 AM12/20/12
to spark...@googlegroups.com
I have also checked that the "count()" operation is the one which is taking time in the part "totalEdge = iterativeInput.filter(_.contains("mstEdge")).flatMap(x => x.split(" ")(1).split("_")).count().toInt"
 
I believe that "count()" should return 4000+ value. That means it is counting 4000+ rows in the RDD. But should it take that much time that its almost stuck?
 
Thanks,
Gaurav

Patrick Wendell

unread,
Dec 20, 2012, 2:06:21 AM12/20/12
to spark...@googlegroups.com
Hey Gaurav,

Are you actually calling cache() anywhere here? Before you start
iterating, it would probably be good to cache the iterativeInput RDD
by calling .cache().

Also, spark does lazy evaluation, so it won't go and process a
sequence of maps/reduces/groupBy's until the count() is called (count
actually returns a value to the caller, unlike the other
transformations which just return new RDD's). That's why you are
seeing most of the work occur when you call count().

- Patrick

Gaurav Dasgupta

unread,
Dec 20, 2012, 3:08:28 AM12/20/12
to spark...@googlegroups.com
I am calling the cache() before starting the iteration like this:
 
var iterativeInput = sc.textFile("hdfs://br9/user/root/mstInput/5000node.txt", 88)
iterativeInput = iterativeInput.cache()
 
When I execute the job, it gives me the following exception for all serialized taks:
 
12/12/20 01:59:02 ERROR storage.BlockManagerMasterActor: key not found: BlockManagerId(babar8.musigma.com, 37938)
java.util.NoSuchElementException: key not found: BlockManagerId(babar8.musigma.com, 37938)
 at scala.collection.MapLike$class.default(MapLike.scala:225)
 at scala.collection.mutable.HashMap.default(HashMap.scala:45)
 at scala.collection.MapLike$class.apply(MapLike.scala:135)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:45)
 at spark.storage.BlockManagerMasterActor.spark$storage$BlockManagerMasterActor$$heartBeat(BlockManagerMaster.scala:244)
 at spark.storage.BlockManagerMasterActor$$anonfun$receive$1.apply(BlockManagerMaster.scala:189)
 at spark.storage.BlockManagerMasterActor$$anonfun$receive$1.apply(BlockManagerMaster.scala:184)
 at akka.actor.Actor$class.apply(Actor.scala:318)
 at spark.storage.BlockManagerMasterActor.apply(BlockManagerMaster.scala:91)
 at akka.actor.ActorCell.invoke(ActorCell.scala:626)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
 at akka.dispatch.Mailbox.run(Mailbox.scala:179)
 at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
 at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
 at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
 at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
 at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
What should I do exactly?
And also, I have broken the sequence of RDD operations into parts:
      info("Calculating total edges formed..................................................................................")
      var totalEdgeFilter = iterativeInput.filter(_.contains("mstEdge"))
      info("Total edge filter done..........................................................................................")
      var totalEdgeFlatMap = totalEdgeFilter.flatMap(x => x.split(" ")(1).split("_"))
      info("Total edge flat map done........................................................................................")
      totalEdge = totalEdgeFlatMap.count().toInt
      info("Calculated total edges formed...................................................................................")
 
Still the case is same, i.e., count() is taking huge time. Now, I believe when the "count()" is invoked, all the earlier operations are completed. Am I right?
 
Thanks,
Gaurav

Gaurav Dasgupta

unread,
Dec 20, 2012, 5:37:55 AM12/20/12
to spark...@googlegroups.com
OK. I understood one thing after I refered http://spark-project.org/docs/0.6.0/scala-programming-guide.html "RDD Operations" sections:
By default, each transformed RDD is recomputed each time we run an action.
 
So, when I am running count() or reduce() actions, all the previous RDD is re-computed. Am I right? But my count() action requires only the one previous RDD, i.e., the flatMap. So, is there a way that I can drop the other RDDs each iteration after their transformations to avoid re-computation?
 
Please clear me out. I think I am too much confused here.
 
Thanks,
Gaurav 

Mark Hamstra

unread,
Dec 20, 2012, 5:45:08 AM12/20/12
to spark...@googlegroups.com
Yes, as Patrick pointed out before, transformations on RDDs are lazy, and the actual work doesn't get done until an action (such as count) is called, forcing the evaluation of the RDD's lineage of transformations.

Mark Hamstra

unread,
Dec 20, 2012, 5:59:44 AM12/20/12
to spark...@googlegroups.com
You seem to be fundamentally misunderstanding how lazy transformations of RDDs work.  None of your sequence of transformations of the RDD defined with sc.textFile is actually evaluated until an action is called, forcing the chain of transformations to actually be evaluated (as well as then populating the cache with the values present at whatever logical point in the chain you called cache()).  Using a var or val to attach a name to some intermediate step in the chain of transformations (and vals typically make far more sense for RDDs...) doesn't force the prior transformations to be evaluated.  Evaluation and cacheing still don't occur until an action is called.

Mark Hamstra

unread,
Dec 20, 2012, 6:06:58 AM12/20/12
to spark...@googlegroups.com
Yes, that is what cache() is for, and is why Patrick suggested that cache() be called before beginning the iterative portion of your code -- so that all of the RDD transformations before that point will only be done once, the first time an action is called requiring evaluation of the RDD lineage up to the call of cache(). 


On Thursday, December 20, 2012 2:37:55 AM UTC-8, Gaurav Dasgupta wrote:

Gaurav Dasgupta

unread,
Dec 20, 2012, 7:04:50 AM12/20/12
to spark...@googlegroups.com
Thanks Mark. I got the concept now.

Matei Zaharia

unread,
Dec 20, 2012, 2:28:08 PM12/20/12
to spark...@googlegroups.com
If you are still having that "key not found" error, it may mean that your DNS is misconfigured (and a machine is reporting the wrong hostname for itself), or that something else failed earlier in the job. Look for the first thing that failed.

Matei

srikanth reddy

unread,
Aug 8, 2013, 5:32:24 PM8/8/13
to spark...@googlegroups.com
Using a var or val to attach a name to some intermediate step in the chain of transformations (and vals typically make far more sense for RDDs...) doesn't force the prior transformations to be evaluated. 
++++
Hi,

I am clear about cacheing of RDDs but what does the above statement mean?

I am observing a behavior where a non-cached MapPartitionsRDD(that has val attached as a name to it) behaves just like a cached RDD but I am not cacheing that RDD. I dont see this behaviour for FilteredRDDs though.

How is attaching a var/val name to some intermediate step different to cacheing an RDD?

srikanth reddy

unread,
Aug 8, 2013, 5:45:47 PM8/8/13
to spark...@googlegroups.com
Here is the example:
++++
 val clicks1 =  sc.textFile(icaf)
 val clicks2 =  clicks1.map(line=>line.split("\t")).filter(line => line.length == 96 ) 
 val pair3 = clicks2.map(line => (line(icauid) ,0 ))
 val poold = pair3.reduceByKey( _+_)
 val poolx = poold.map(x=> ( x._1, x._2+1))
+++

No cacheing is used.

repeated invocations to clicks2.count, pair3.count always do computations from the beginning starting with reading the file from hdfs.
But repeated invocations(after the first invocation) to poolx.count only go back only till poold RDD.  un-cached RDD poold is kind of behaving like a cached RDD where the computation always starts from this RDD for repeated actions called on subsequent transformed RDDs

Ian O'Connell

unread,
Aug 8, 2013, 6:11:51 PM8/8/13
to spark...@googlegroups.com
No caching is used in that example, so any reference to poolx will recalculate all the previous steps

However if you did poolx.cache.count

any futher references to poolx will use the cache'd version if it still available. Assigning intermediate steps to vals here just aids in readability, having or not having the vals will make no difference to the program


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Mark Hamstra

unread,
Aug 8, 2013, 9:20:53 PM8/8/13
to spark...@googlegroups.com
Not quite.  Because there is a shuffle involved in pair3.reduceByKey( _+_), there is a sort of implicit caching going on where the outputs from the preceding stage need to be materialized, tracked, and distributed (often across the network) to the appropriate reducers.  If you want to dig into the details of this, it revolves around the MapOutputTracker.  Anyway, it is possible for the re-evaluation of a shuffle to make use of the prior map outputs and not need to re-evaluate the RDD all the way from the beginning.

So, yeah, the re-evaluation of an RDD after a shuffle can be something like the re-evaluation of a cached RDD; but that is still not a function of an external name referencing the RDD, but rather of what is required to re-evaluate the lineage present internally to the RDD.

srikanth reddy

unread,
Aug 9, 2013, 2:27:31 PM8/9/13
to spark...@googlegroups.com
Thanks.  Does the non-cached MapPartitionsRDD behave similar to a cached RDD in terms of life-period, eviction etc?

Mark Hamstra

unread,
Aug 9, 2013, 4:40:12 PM8/9/13
to spark...@googlegroups.com
I don't know what you mean.

There is no shuffle inherent in mapPartitions, so it should behave similarly to map, filter, and other "pure" transformations that in the absence of explicit caching/persistence/checkpointing do not cause retention of any results (either intermediate results or final results.)  

Ian O'Connell

unread,
Aug 9, 2013, 5:06:36 PM8/9/13
to spark...@googlegroups.com
Thanks for the pointer at MapOutputTracker Mark, to just be clear on my understanding:

The metadata associated with the shuffle step will be kept (save for node failures) until the metadata cleaner TTL kicks in. The files on disk are just not deleted(there really isn't a temp serialized output cleanup, which I think i've seen before?). 

Ian O'Connell

unread,
Aug 9, 2013, 5:08:10 PM8/9/13
to spark...@googlegroups.com
One thing I forgot to add,

how much sense does it make to rely on this behavior/recommend to end users, effectively the outputs from shuffle tasks could start being only kept in memory with a spill to disk approach which might cause problems here? 

Mark Hamstra

unread,
Aug 9, 2013, 6:36:29 PM8/9/13
to spark...@googlegroups.com, i...@ianoconnell.com
Oh, I'm definitely not recommending that you write code that depends on these implementation details.

Mark Hamstra

unread,
Aug 9, 2013, 6:49:20 PM8/9/13
to spark...@googlegroups.com, i...@ianoconnell.com
Pretty much correct, but this is really about Spark implementation details, so we should move the discussion over to the dev list if you want to explore the internals of the MapOutputTracker further.

srikanth reddy

unread,
Aug 12, 2013, 4:15:07 PM8/12/13
to spark...@googlegroups.com
Hi Mark,
  I am newbie to spark and I am not sure if I used the right term mapPartitionsRDD.

  What I meant is that there is some RDD(ShuffledRDD?)  that gets created at  val poold = pair3.reduceByKey( _+_)  and repeated invocations of poolx.count does not really start the computations from the beginning(i.e reading from hdfs stage)

  I guess that some intermediate RDD is behaving just a cached RDD and my question is how does this intermediate RDD compare with a cached RDD in terms of its life-period , eviction etc.

  In the example that I provided I am not cacheing any RDDs at all.

Mark Hamstra

unread,
Aug 12, 2013, 5:47:36 PM8/12/13
to spark...@googlegroups.com
The short answer is that they don't compare.  As Ian and I already discussed, the intermediate results in the MapOutputTracker are an implementation detail, and as such are of a fundamentally different nature from cached RDDs created through the public API.  If you want to depend on the behavior of the MapOutputTracker, then it is up to you to figure out those details and maintain your code through any future changes in Spark's implementation.

If you just want to make sure that your RDD is not re-evaluated earlier than a certain point in its lineage, then cache or checkpoint it using the public API.

srikanth reddy

unread,
Aug 16, 2013, 4:24:06 PM8/16/13
to spark...@googlegroups.com
Got it. Thanks!
Reply all
Reply to author
Forward
0 new messages