Parallel execution of RDDs

1,815 views
Skip to first unread message

Jesvin Jose

unread,
Jul 24, 2013, 2:01:01 AM7/24/13
to spark...@googlegroups.com
I have a List[spark.RDD[T]]. I want to apply the transformation/action on each RDD within that list. AFAIK, it has to be done at driver side. A foreach on the list processes each RDD one after another, but how do I make it execute in parallel?

Reynold Xin

unread,
Jul 24, 2013, 2:04:37 AM7/24/13
to spark...@googlegroups.com
You can use Scala's (or Akka's) future concept to do this. Some semi-pseudocode that is not going to compile:

val rdds: List[RDD[T]] = ...

val futures = rdds.map { rdd =>
  rdd.map(...).reduceByKey(...).collect()
}

The above code will submit all jobs directly to Spark's scheduler, and you get a list of "Future"s back.

--
Reynold Xin, AMPLab, UC Berkeley



On Tue, Jul 23, 2013 at 11:01 PM, Jesvin Jose <frank.e...@gmail.com> wrote:
I have a List[spark.RDD[T]]. I want to apply the transformation/action on each RDD within that list. AFAIK, it has to be done at driver side. A foreach on the list processes each RDD one after another, but how do I make it execute in parallel?

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Jesvin Jose

unread,
Jul 24, 2013, 2:24:15 AM7/24/13
to spark...@googlegroups.com
Where is it getting wrapped into a future?

Must it be rather be like:

val futures = rdds.map { rdd =>
  future( rdd.map(...).reduceByKey(...).collect())
}

I just refered pages like http://docs.scala-lang.org/overviews/core/futures.html .Which execution context must it use?

Reynold Xin

unread,
Jul 24, 2013, 2:29:07 AM7/24/13
to spark...@googlegroups.com
Ah yea, sorry I actually missed the future part.

You should be able to use the default implicit execution context. I haven't really tried the same thing myself, so you might have to do some digging yourself, but it should work.

Another way to do this is to use Scala's parallel collection. It is not as fine-grained as Future's, but might work for your purpose.

rdds.par.foreach(...)

This will submit the jobs in parallel as well, but it does so by chopping rdds into multiple groups, and run each group in a thread.

--
Reynold Xin, AMPLab, UC Berkeley

Jesvin Jose

unread,
Jul 24, 2013, 3:02:55 AM7/24/13
to spark...@googlegroups.com
The par transformation seems much more convenient. Is it as fast as futures for this use case of submitting to spark scheduler and collecting results? There is just one driver program running at a time.

scala> Runtime.getRuntime.availableProcessors
res19: Int = 4

Reynold Xin

unread,
Jul 24, 2013, 2:38:17 PM7/24/13
to spark...@googlegroups.com
Future way submits all jobs at the same time to Spark's scheduler, and let Spark's scheduler handle the scheduling of all those jobs. Parallel collection way is essentially similar to running  them in a single threaded program, but now having N threads  doing the submission (each will submit a job, block until the job is done, then submit the next job).

In terms of trade-offs:

- Utilization wise, future probably has higher cluster utilization in some cases. But I suspect you won't see a big difference in most cases.

- You get finer control over job progress using Futures. For example, you can work with partial results.

- If you have too many jobs (e.g. 10000), submitting all of them to Spark might overload the Spark scheduler. I have never tested running 10000 jobs concurrently.

Jesvin Jose

unread,
Jul 26, 2013, 1:33:55 AM7/26/13
to spark...@googlegroups.com
Thanks for the complete answer, so I will stick to collection.par for simplicity and then for futures if the cluster is underutilized because the submission is a bottleneck.

che...@tingatech.com

unread,
Jul 26, 2013, 12:53:06 PM7/26/13
to spark...@googlegroups.com
The follow-up question is 

when I have a collection of objects, should I create one RDD with  sparkCtx.parallelize() or should I create a List of RDDs and execute them in parallel (via Collection.par or via Future block) 

What's runtime differences ? when should I use one vs. the other. 

thanks

Chester

Ian O'Connell

unread,
Jul 26, 2013, 1:40:04 PM7/26/13
to spark...@googlegroups.com
In general if your operating on things as a group, with map's or otherwise and are like records of a type. Then they should all be in the one RDD. Reasonably similar to a SQL table of objects.
Reply all
Reply to author
Forward
0 new messages