Need help regarding Spark code optimization to increase performance

2,217 views
Skip to first unread message

Gaurav Dasgupta

unread,
Jan 7, 2013, 2:30:20 AM1/7/13
to spark...@googlegroups.com
Hi,
 
I have implemented an MST (Kruskals/Prims) logic in Spark and running it in my 11 node cluster. The input is in form of adjList like this:
"nodeIds Edges:Weights"
 
For bigger graphs (say 10000 nodes), I my code is taking around 1 hour to finish the job. The data size is 1GB. I have enough memory to store and process this data.
Also, for 5000 nodes (450 MB data size) the execution time is 5-7 mins.
Here is the logic which I have implemented:
 
Map Function:
 
  • Split the input into two parts to get "nodeId" and "Edges:Weights"
  • Takes the minimum Edges (by Weight) from each node and append to mapOutput (List[k,v])
  • Append (nodeId, "nodeId-minEdge:minWeight") to mapOutput
  • Append mstEdge and mstWeight from previous iteration to mapOutput
  • Return mapOutput
Reduce Function:
  • Append row (Seq[String]) to reduceOuput (List[String]) for the mapValues which does'nt contain MST information
  • Forming mstList (List[String]) and mstWeight (String) from mapOutput
  • Forming edgeList (List[String]) from the minEdges got from Map function and sorting the list by Weights
  • Iterating over "edgeList":
    • Retrieve srcNode and dstNode
    • If dstNode is not in "mstList" as a srcNode Then add "srcNode-dstNode" to mstList and mstWeight += weight
    • Append mstList and mstWeight to reduceOutput
  • Return reduceOutput

// I am using "break" for the main loop in reduce function (iterating over "edgeList")

Main Function:
 
def main(args: Array[String]) {
    val sc = new SparkContext(args(0), "Prims")
    var iterativeInput = sc.textFile("hdfs://...", 88)
    val totalNode = iterativeInput.count().toInt
    var totalEdge = 0
    var iteration = 0      
    while (totalEdge < (totalNode)) {
      val mapResult = iterativeInput.flatMap(x => PrimsMap(x)).groupByKey(88)
      val reduceResult = mapResult.mapValues(x => PrimsReduce(x))
      iterativeInput = reduceResult.flatMap { case (k, vs) => vs.map(v => (k+" "+ v)) }
      totalEdge = iterativeInput.filter(_.contains("mstEdge")).flatMap(x => x.split(" ")(1).split("_")).count().toInt
      iteration += 1
    }   
    val MSTEdge = iterativeInput.filter(_.contains("mstEdge")).map(x => x.split(" ")(1))
    val MSTWeight = iterativeInput.filter(_.contains("mstWeight")).map(x => x.split(" ")(1))
    val output = MSTEdge.union(MSTWeight)
    output.saveAsTextFile("/opt/result")
    System.out.println("Total Number of Iterations: " + iteration.toString())
    System.exit(0)
  }
 
I have tried to explain that what exactly I am doing. Can some one please suggest me that which operation is actually taking time. I think that the code can be optimized so that the execution time can be decreased a lot. My cluster config is:
11 nodes cluster -  88 CPUS Total
16 GB RAM each node
SPARK_MEM = 10g
Spark running on Mesos Cluster
 
Please tell me if you need more information to understand the implementation.
 
Thanks,
Gaurav

Gaurav Dasgupta

unread,
Jan 7, 2013, 2:31:38 AM1/7/13
to spark...@googlegroups.com
Even the total number of iteration is on 3.

Gaurav Dasgupta

unread,
Jan 7, 2013, 6:32:40 AM1/7/13
to spark...@googlegroups.com
While I am running the jobs in my cluster, I can see the job hangs for few seconds/minutes at the following phase and then again continue:

13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar6.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTracker: Size of output statuses for shuffle 1 is 748 bytes
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar8.musigma.com
13/01/07 05:13:4 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar3.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar2.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar5.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar4.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar10.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar1.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar9.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar7.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar10.musigma.com
13/01/07 05:13:42 INFO spark.MapOutputTrackerActor: Asked to send map output locations for shuffle 1 to babar11.musigma.com


So, what exactly is taking time here (shuffle phase)? How can I decrease this time?
My network bandwidth is 14mbps and also when I am running Hadoop job, the shuffling phase happens smoothly. Hence, I am assuming that my Network IO is not the issue because of which shuffle in Spark job is taking more time.

Thanks,
Gaurav

Mark Hamstra

unread,
Jan 7, 2013, 2:32:40 PM1/7/13
to spark...@googlegroups.com
The following code is for illustrative purposes only, and should not be considered good Spark programming style.

Example A:

var rdd = sc.parallelize(List(1, 2, 3, 4))
var i = 0
while(i < 3) {
val result = rdd.flatMap{x => Thread.sleep(1000); List(x)}
rdd = result.map(x => x * 10)
println("Count: " + rdd.count)
i += 1
}

Example B:

var rdd = sc.parallelize(List(1, 2, 3, 4))
var i = 0
while(i < 3) {
val result = rdd.flatMap{x => Thread.sleep(1000); List(x)}
rdd = result.map(x => x * 10).cache()
println("Count: " + rdd.count)
i += 1
}
 

Assume that we run these two examples in the default spark-shell -- i.e. MASTER is local[1], so all of the RDD operations are performed single-threaded on a single split.
 
Questions:

How many times is "Count: 4" printed in Example A and in Example B?
About how long does the final iteration of the while loop take in Example A and in Example B?

Answers:
3 and 3
12 seconds and 4 seconds

If these numbers are not what you expect, then I think that we have identified a great deal of the inefficiency in your code. 

Gaurav Dasgupta

unread,
Jan 8, 2013, 12:21:18 AM1/8/13
to spark...@googlegroups.com
Hi Mark,

Thanks for the reply. So, it means I should use "cache()" on the RDDs in order to improve the performance. 
I have always been confused about using cache() for the way I am transforming my RDDs. Can you please suggest that which all RDDs I should cache() in my code?

Thanks,
Gaurav

Mark Hamstra

unread,
Jan 8, 2013, 4:20:47 AM1/8/13
to spark...@googlegroups.com
I'm not sure just what your code is doing.  In general, you want to call cache() any time that you don't want future actions on an RDD to recompute its entire lineage since the previous cache() (or back to the beginning if the RDD has never been cached.)  It looks to me like you expect each iteration of your while loop to only calculate the transformations expressed in that iteration, but since you are never caching the results, each iteration also has to recalculate all of the transformations from the previous iterations, so each iteration just ends up doing more and more work instead of a relatively constant amount per iteration.

Gaurav Dasgupta

unread,
Jan 8, 2013, 11:47:51 PM1/8/13
to spark...@googlegroups.com
When I am trying to use cache() for the following two RDDs:

val mapResult = iterativeInput.flatMap(x => PrimsMap(x)).groupByKey(88).cache()
val reduceResult = mapResult.mapValues(x => PrimsReduce(x))
.cache()

I am getting the following runtime error:

13/01/08 22:41:13 ERROR storage.BlockManagerMasterActor: key not found: BlockManagerId(babar1.musigma.com, 58101)
java.util.NoSuchElementException: key not found: BlockManagerId(babar1.musigma.com, 58101)
    at scala.collection.MapLike$class.default(MapLike.scala:225)
    at scala.collection.mutable.HashMap.default(HashMap.scala:45)
    at scala.collection.MapLike$class.apply(MapLike.scala:135)
    at scala.collection.mutable.HashMap.apply(HashMap.scala:45)
    at spark.storage.BlockManagerMasterActor.spark$storage$BlockManagerMasterActor$$heartBeat(BlockManagerMaster.scala:244)
    at spark.storage.BlockManagerMasterActor$$anonfun$receive$1.apply(BlockManagerMaster.scala:189)
    at spark.storage.BlockManagerMasterActor$$anonfun$receive$1.apply(BlockManagerMaster.scala:184)
    at akka.actor.Actor$class.apply(Actor.scala:318)
    at spark.storage.BlockManagerMasterActor.apply(BlockManagerMaster.scala:91)
    at akka.actor.ActorCell.invoke(ActorCell.scala:626)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
    at akka.dispatch.Mailbox.run(Mailbox.scala:179)
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
    at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
    at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
    at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
    at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

What does this error mean? Where and how can I then use cache()? Please help.

Thanks,
Gaurav

Gaurav Dasgupta

unread,
Jan 9, 2013, 5:04:00 AM1/9/13
to spark...@googlegroups.com
OK. I have checked out and built the latest version of Spark from Git and the above error disappeared. It was a bug.
 
But, I have used cache() at every possible places and re-executed the code and still I am not getting better performance. Here is the execution times for different data size:
Graph Nodes Execution Time Number of Iteration Data Size
17 nodes 4 secs 2 4.9 KB
50 nodes 5 secs 3 40.5 KB
100  nodes 6 secs 2 179 KB
500 nodes 10 secs 3 4.35 MB
1000 nodes 28 secs 3 17.4 MB
 5000 nodes 7 mins, 49 secs 3 435 MB

10000 nodes

1 hr, 14 mins 3 1.7 GB
 
I have splitted the input datasets with different numbers for the different files in order to get the best performance. You can see that the execution time increased a lot for 5000 nodes graph and drastically more for 10000 nodes graph. In my opinion, the same Hadoop code will give me better results for those two datasets. 
 
My questions are:
1. How Spark is scalable in terms of performance. How do I ensure that my Spark job scales up with increase of data size in terms of performance? 
2. What can I do to increase the performance (say 5-10 mins for 10000 nodes graph) when using cache() on RDD is not helping even?
 
1 hour plus execution time for a 1.7GB dataset is huge for me. Please suggest.
 
 
Thanks,
Gaurav

Jason Dai

unread,
Jan 9, 2013, 7:50:32 PM1/9/13
to spark...@googlegroups.com
We had a similar problem and our investigation results show that it is most likely a performance issue in shuffle (see https://groups.google.com/forum/?fromgroups=#!topic/shark-users/IHOb2u5HXSk) - "Asked to send map output locations for shuffle" is logged by the task when the shuffle begins.

I wonder whether there are lot of data needs to be shuffle between different stages in your case, and if yes, what the network bandwidth usage is for each of your node.

Thanks,
-Jason

Matei Zaharia

unread,
Jan 9, 2013, 8:12:14 PM1/9/13
to spark...@googlegroups.com
The other issue I've seen is not increasing the number of reduce tasks when you have more data. By default, Spark only uses 8 reduce tasks for operations like groupByKey and reduceByKey. You can call System.setProperty("spark.default.parallelism", "<value>") before creating a SparkContext to set it higher. I would try 40 or more.

Matei

Shane Huang

unread,
Jan 9, 2013, 9:24:01 PM1/9/13
to spark...@googlegroups.com
We've tried increasing the number of reduce tasks. It helps in some extent. But shuffle could still be slow when we used many reduce tasks.  
Please refer to analysis item 1 for details in post  https://groups.google.com/forum/?fromgroups=#!topic/shark-users/IHOb2u5HXSk

Gaurav Dasgupta

unread,
Jan 10, 2013, 3:03:52 AM1/10/13
to spark...@googlegroups.com
I agree with Jason that the shuffle time is more when the data is more. In my case I have a sufficient network bandwidth. Shuffle phase is faster in my cluster when I run Hadoop jobs.
I also have increased the number of reducers (even up to 88 for my 88 cores cluster) for big data size. Increasing reduce tasks helps in better performance but still it does not give me some acceptable performance.
Even SPARK_MEM is set to 10g per node which is more than enough for a 1-2 GB file. How can it then take 1 hour for 3 iterations on 1-2 GB dataset?
Even using cache() is not helping that much.

The main logic in my code is just to loop over the graph data and add them to a list after checking some conditions which is returned after each iteration. Is the shuffle time is all because of which the performance is slow? Or its a code issue?

Thanks,
Gaurav

Matei Zaharia

unread,
Jan 10, 2013, 3:19:16 AM1/10/13
to spark...@googlegroups.com
One other thing that might help is to use Kryo serialization library for shuffle, because otherwise perhaps the data representation is too big (Java Serialization can easily produce serialized sizes that are 10-20x bigger than the raw data). Take a look at http://www.spark-project.org/docs/latest/tuning.html for how to enable it.

Other than that, I think you should look at what the nodes are doing and profile your code. First, run dstat on the nodes while the job is running to see the rate of network IO, disk IO, and CPU usage (you can run top for CPU as well). If you see very low network usage (I imagine your nodes have at least 1 Gigabit/second network), then the problem is the CPU cost of your job. You can use a Java profiler to see what it's spending time on -- the simplest thing is to actually run jstack <processID> a few times, where processID is your Java process, and look at the stack traces of the "running" threads; 99% of the time there will be one or two things in the code that are a bottleneck.

Matei

Shane Huang

unread,
Jan 10, 2013, 4:02:38 AM1/10/13
to spark...@googlegroups.com
After we saw the shuffle problem we did some profiling. In out test, CPU usage, disk and network throughput are all low. Wait/IO is noticeable. We did some code profiling and found channel.write has very high latency (which shouldn't happen in normal case because it's non-blocking). We also ran ConnectionManagerTest but channel.write latency is low. Don't know why shuffle got such high latency. We once thought it was the memory problem, but in java profiler we saw the heap usage is about half of total heap size and we didn't see a lot of GC activities. Seems not a problem of memory either. Shark uses kryo serialization  by default. We tried both jdk1.6.x and 1.7, and shuffle problem stays still. 

I updated some CPU and disk charts in the post where I described our problem. https://groups.google.com/forum/?fromgroups=#!topic/shark-users/IHOb2u5HXSk
  
Thanks,
Shane
Message has been deleted

Praveenesh Kumar

unread,
Jan 11, 2013, 5:26:24 AM1/11/13
to spark...@googlegroups.com
Is this a known issue ?
Even we have noticed the similar behaviours. Retried with increasing number of reducers, more worker memory, and even kyro serializer, but couldn't get expected performance. Seems like as the shuffling data increases, spark is not able to provide good performance.
Is it noticed before by other spark contributors/users ?

Regards,
Praveenesh

On Friday, January 11, 2013 3:50:07 PM UTC+5:30, Gaurav Dasgupta wrote:
I have used KryoSerializer and and re-executed the code. But its still not helping increasing the performance issue. As pointed in the above mails, I feel that its the shuffle time only which is more. Might be something can be changed in the framework to reduce the shuffle time.
 
I have attached my code on which I am working. Some one can use this code to run in his cluster and see the performance. Please verify if the code can be optimized to increase the performance or its something in the Spark framework because of which it is taking time.
This is a small dataset. I am unable to upload big data sets due to size issue. You may use some big graph data if you have.
The input format is:
nodeId Edge1:Weight, Edge2:Weight, ...

A sample graph will be like this:
1 2:4.0,3:12.0,4:5.0
2 1:4.0,3:7.0,4:8.0
3 1:12.0,2:7.0,4:2.0
4 1:5.0,2:8.0,3:2.0

Thanks,
Gaurav

Gaurav Dasgupta

unread,
Jan 11, 2013, 5:32:54 AM1/11/13
to spark...@googlegroups.com
I have used KryoSerializer and and re-executed the code. But its still not helping increasing the performance issue. As pointed in the above mails, I feel that its the shuffle time only which is more. Might be something can be changed in the framework to reduce the shuffle time.
 
Please find my code here:
http://pastebin.com/download.php?i=Qz0v8Vmf

Gaurav Dasgupta

unread,
Jan 11, 2013, 6:36:08 AM1/11/13
to spark...@googlegroups.com
Also, please find a sample dataset here:

http://pastebin.com/download.php?i=actR5tbU

This is 101 node graph. The size is very small. I am unable to upload bigger datasets. If you can make or get from somewhere very big graph datasets, that would be better to understand the performance.

Thanks,
Gaurav

Jiacheng Guo

unread,
Feb 6, 2013, 7:17:54 AM2/6/13
to spark...@googlegroups.com
I also encounter the same issue. On a dataset with 80G data with about 5e8 key,value pair on 4 machine of 12 cores and 1Gb/S network, the shuffle stage can take up to 1 hour for a dataset. My observation is that I have some what mixture of CPU surge with single thread and long idle time . What's behavior of shuffle stage? It seems not performing very well.

Regards,
Jiacheng Guo

Shane Huang

unread,
Feb 21, 2013, 4:43:40 AM2/21/13
to spark...@googlegroups.com
Jason and I had come up with a workaround which improves the shuffle time of our in-house job by about 4x. 
We're still working with the Berkley guys to find out a total solution for the shuffle issue. Anyway I think you could try the workaround patch if shuffle performance is blocking your way. 

On Monday, January 7, 2013 7:32:40 PM UTC+8, Gaurav Dasgupta wrote:

Jiacheng Guo

unread,
Mar 5, 2013, 7:35:54 AM3/5/13
to spark...@googlegroups.com
Hi Shane,
    Thanks for your patch. Your patch works wonderfully for my application by reducing shuffle time by more than 2x. Is there any progress on whether this patch will merge into the main trunk?

Thanks,
Jiacheng Guo
Reply all
Reply to author
Forward
0 new messages