pro tips for avoiding timeouts with large clusters

728 views
Skip to first unread message

walrus...@gmail.com

unread,
Aug 13, 2013, 6:00:21 PM8/13/13
to spark...@googlegroups.com
Hi,

I have a job that works fine with cluster of size 35.  When the cluster gets large (160-200 machines), the driver's log gets decidedly less chatty and all the slave logs look like the below.  What twiddly knobs can I adjust to get to use large clusters?

Thanks

13/08/13 21:44:45 WARN storage.BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [10000] milliseconds
	at akka.dispatch.DefaultPromise.ready(Future.scala:870)
	at akka.dispatch.DefaultPromise.result(Future.scala:874)
	at akka.dispatch.Await$.result(Future.scala:74)
	at spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:136)
	at spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:39)
	at spark.storage.BlockManager.spark$storage$BlockManager$$heartBeat(BlockManager.scala:115)
	at spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:142)
	at akka.actor.DefaultScheduler$$anon$1.run(Scheduler.scala:142)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:94)
	at akka.jsr166y.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1381)
	at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
	at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
	at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)


walrus...@gmail.com

unread,
Aug 13, 2013, 6:16:40 PM8/13/13
to spark...@googlegroups.com
Relevant flags are below... thanks:

-Dspark.worker.timeout=30000 -Dspark.akka.timeout=30000 -Dspark.storage.blockManagerHeartBeatMs=30000  -Dspark.akka.retry.wait=30000 -Dspark.akka.frameSize=10000

Mridul Muralidharan

unread,
Aug 13, 2013, 6:57:50 PM8/13/13
to spark...@googlegroups.com
You could increase 'spark.akka.askTimeout' to something higher than 10seconds.
I use 30 seconds in some of our expts ...



Been meaning to dump these someplace for a while, this is as good as any !
The other options I have are :

-Dspark.tasks.schedule.aggression=ANY
This is slightly specific to our jobs - but bottomline is, if you
have a rack aware cluster, set this to RACK_LOCAL for some non trivial
bump in performance (would be job characterstics specific though).

-Dspark.tasks.revive_periodic.interval=200
How soon to 'revive' offers - makes sense when customizing property above.

-Dspark.speculation=true
To overcome stragglers.

-Dspark.speculation.multiplier=1.2
-Dspark.speculation.quantile=0.5
These customize speculative execution and are slightly specific to our
jobs - but you might be able to play with it too.

-Dspark.storage.memoryFraction=0.25
Fraction of available memory to use for memory - our persistance is
memory and disk since we deal with RDD's which cannot be completely
hosted in available memory.


-Dspark.akka.frameSize=100
-Dspark.akka.askTimeout=30

You already have these set.

-Djava.io.tmpdir=<my_temp>
In case temp directory fills up ...




Hope these help ...

Regards,
Mridul
> --
> You received this message because you are subscribed to the Google Groups
> "Spark Users" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to spark-users...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.

walrus...@gmail.com

unread,
Aug 13, 2013, 8:11:45 PM8/13/13
to spark...@googlegroups.com
Very much appreciated!

In order to inspire further posts, I thought I'd add this piece of information:

When my number of cores * 2 is 768, and default parallelism is set to the same, the tasks in the log actually count up to 768.

When I had a lot of slaves and the number was 3184, the tasks only count up to 560.

Ian O'Connell

unread,
Aug 13, 2013, 8:28:43 PM8/13/13
to spark...@googlegroups.com
how many partitions do you have in your data? its possibly just not being distributed enough, if reading from HDFS the map operation will only have at most 1 task per split. So if there's only 560 splits in the input... etc. The locality, type of group by operations could all impact the number of tasks/partitions in use

walrus...@gmail.com

unread,
Aug 13, 2013, 11:04:02 PM8/13/13
to spark...@googlegroups.com, i...@ianoconnell.com
I haven't specified any set number of partitions (how would I verify?)  It's the same data and job both times.  Would you mind pointing me to a resource to explain your last sentence in more detail?

Thank you

Grega Kešpret

unread,
Sep 19, 2013, 10:50:57 AM9/19/13
to spark...@googlegroups.com, mri...@gmail.com
Thanks for the useful tips, much appreciated. Are these documented somewhere?
It would be nice to have documentation of all flags one can set with some info regarding when to set them and some basic recommendations. 

For example, `Walrus theCat` suggested  using 30000 for spark.worker.timeout, but looking at the source for v0.7.3, this seems to be in seconds, not milliseconds (the default is 60). So this would be 8 hours.
Reply all
Reply to author
Forward
0 new messages