Standalone Cluster: Worker, Master and Job memory questions

532 views
Skip to first unread message

Grega Kespret

unread,
Dec 11, 2012, 9:58:16 AM12/11/12
to spark...@googlegroups.com
Hello,

I have a couple of questions:

If we want to deploy my Job to Spark Standalone Cluster we first start the Master with "run spark.deploy.master.Master", then start at least one Worker with i.e. "spark.deploy.worker.Worker --memory 4G --webui-port 8082 spark://ip...:7077". Then we start a standalone Scala program (entry point), which initializes SparkContext with the Cluster URL to connect to, in this case spark://ip...:7077.
  1. So now we have 3 JVM processes running. Since we did not change spark-env.sh, Worker and Master start with the default 512MB of heap (jinfo confirms -Xms512m -Xmx512m). However, we ran Worker with --memory 4G flag. Also, Web UI reports that our Worker has Memory: 4.0 GB (0.0 B Used). One would think that "--memory" would indicate how much memory the Worker has available. But we have just seen this is not the case. So we are thinking that "--memory" should be less or equal than the max heap space that was allocated to JVM process (i.e. with variable SPARK_MEM). But what exactly is "--memory" and how is it used? How does it differ from max heap space that process has available? 
  2. How much memory should Master have? What kind of objects pass through Master? Serialized/deserialized? 
  3. How much memory should our entry point (standalone Scala program) have? We have basically this flow: 
    • val a: RDD[String]  = sc.textFile(urlPattern)
    • val b: RDD[CustomObject1] = a -> map -> groupByKey(reduceTasks) -> flatMap
    • val c: Seq[CustomObject2] = b -> flatMap -> reduceByKey -> collect -> map
    • val d = c -> groupBy -> foreach

    1. After the collect is called, do we need to have enough memory to hold ALL objects in result set? 
    2. How does reduceTasks (parameter to groupByKey) affect the heap usage inside a JVM process? 

Thank you!

Matei Zaharia

unread,
Dec 11, 2012, 12:00:15 PM12/11/12
to spark...@googlegroups.com
Hi Grega,
  1. So now we have 3 JVM processes running. Since we did not change spark-env.sh, Worker and Master start with the default 512MB of heap (jinfo confirms -Xms512m -Xmx512m). However, we ran Worker with --memory 4G flag. Also, Web UI reports that our Worker has Memory: 4.0 GB (0.0 B Used). One would think that "--memory" would indicate how much memory the Worker has available. But we have just seen this is not the case. So we are thinking that "--memory" should be less or equal than the max heap space that was allocated to JVM process (i.e. with variable SPARK_MEM). But what exactly is "--memory" and how is it used? How does it differ from max heap space that process has available? 
The --memory flag is the max amount of memory to allocate to all Spark jobs on that worker, but each worker can actually be running multiple jobs (if you submit two jobs to your cluster for example). You can set each job's maximum memory through the SPARK_MEM environment variable on the *client* that's submitting it. So when you run your entry point program, set the SPARK_MEM environment variable before. This is a little confusing and I'd like to make it configurable through the code too (on SparkContext).
  1. How much memory should Master have? What kind of objects pass through Master? Serialized/deserialized? 
It can have very little -- probably 512 MB to 1 GB is fine. No tasks or task results pass through it. It just allocates workers to jobs.
  1. How much memory should our entry point (standalone Scala program) have? We have basically this flow: 
    • val a: RDD[String]  = sc.textFile(urlPattern)
    • val b: RDD[CustomObject1] = a -> map -> groupByKey(reduceTasks) -> flatMap
    • val c: Seq[CustomObject2] = b -> flatMap -> reduceByKey -> collect -> map
    • val d = c -> groupBy -> foreach

    1. After the collect is called, do we need to have enough memory to hold ALL objects in result set? 
Yes, if you call collect, it means "give it to me as a local collection". If you can't fit that in your Java process, you should just not call collect; do a map directly on the result of reduceByKey (which is a distributed dataset).

    1. How does reduceTasks (parameter to groupByKey) affect the heap usage inside a JVM process? 
Each reduce task will build a hash table to hold all the keys that hash to it. So for example if you have N keys at the end of your map phase, and you use 8 reduce tasks, each one needs enough RAM to hold N/8 keys. If you use 100 tasks instead, that becomes smaller, at N/100. In general you should probably make the number of reduce tasks bigger than the default; even if it's a few hundred, the overhead from having more tasks won't be very high. We made the default 8 because some people run on small clusters or aggregate on small numbers of keys, but we'd like to make it adaptive in the future.

Matei

Grega Kespret

unread,
Dec 12, 2012, 5:30:36 AM12/12/12
to spark...@googlegroups.com
Hi Matei,

first off thanks for the fast response.

I still have some questions.

The --memory flag is the max amount of memory to allocate to all Spark jobs on that worker, but each worker can actually be running multiple jobs (if you submit two jobs to your cluster for example). You can set each job's maximum memory through the SPARK_MEM environment variable on the *client* that's submitting it. So when you run your entry point program, set the SPARK_MEM environment variable before. This is a little confusing and I'd like to make it configurable through the code too (on SparkContext).

1. Isn't worker constrained primarily by its maximum Java heap space, in our case 512MB? If we set --memory to 4G, how can the worker allocate more than 512MB to its jobs (one or multiple, doesn't matter) if "its own" space is only 512MB? I am thinking that --memory on worker should always be less or equal to the maximum heap space that this worker process has. So, in our case this would either mean increasing -Xmx of the worker or decreasing the --memory parameter.

(please note I am talking about worker _processes_ in Spark standalone, not worker _threads_ in Spark local mode. I find it easier to reason about the memory at the level of JVM rather than at the level of thread)
 
  1. How much memory should Master have? What kind of objects pass through Master? Serialized/deserialized? 
It can have very little -- probably 512 MB to 1 GB is fine. No tasks or task results pass through it. It just allocates workers to jobs.

2. Does this mean that the workers return results to the "entry program/application that holds SparkContext object" directly?
Also, Is there some difference in communication if both Worker and entry program/application are on the same machine? 

    1. How does reduceTasks (parameter to groupByKey) affect the heap usage inside a JVM process? 
Each reduce task will build a hash table to hold all the keys that hash to it. So for example if you have N keys at the end of your map phase, and you use 8 reduce tasks, each one needs enough RAM to hold N/8 keys. If you use 100 tasks instead, that becomes smaller, at N/100. In general you should probably make the number of reduce tasks bigger than the default; even if it's a few hundred, the overhead from having more tasks won't be very high. We made the default 8 because some people run on small clusters or aggregate on small numbers of keys, but we'd like to make it adaptive in the future.

3. For example if I have N=1000 objects with size 10MB each at the end of my map phase (collection total is 10 GB). Then in case I use 10 reduce tasks in groupByKey, each reduce task needs enough RAM to hold 100 objects for total of 1GB. 

I would like to understand how Spark allocates reduce tasks to workers. In case I have just 1 worker
a) does it allocate all reduce tasks to this worker at once, meaning that the worker process needs to have at least 10GB heap space?
b) or, does it do it in streaming fashion? In this case the worker process would need to have only 1GB heap space

What about in case I have 5 worker processes : Does it distribute 2 reduce tasks to each worker process in parallel? 

Thank you!

Grega Kespret

unread,
Dec 12, 2012, 10:14:23 AM12/12/12
to spark...@googlegroups.com
Hi Matei,

regarding my first question from previous reply, I was not aware of the fact that additional JVM processes get spawned on the machine. Since I see another process "StandaloneExecutorBackend", which is spawned with Xmx and Xms flags that match SPARK_MEM that I set on the entry program/client that submitted the job, I presume this is happening. In that case I can understand how --memory flag is something which is not directly bound to heap space of the worker JVM process.
Reply all
Reply to author
Forward
0 new messages