foreachPartition

4,647 views
Skip to first unread message

Jaka Jančar

unread,
Dec 27, 2012, 12:43:10 PM12/27/12
to spark...@googlegroups.com
I would like to insert an RDD into a database cluster.

I can do rdd.collect(), but that requires copying everything to the driver node.

I can do rdd.foreach(), but that will not allow batching the INSERTs.

I guess I could do rdd.mapPartitions(...).count(), but it feels hacky.

Is there a rdd.foreachPartition() or something like that?

Best,
 Jaka

Matei Zaharia

unread,
Dec 27, 2012, 12:44:59 PM12/27/12
to spark...@googlegroups.com
Hi Jaka,

There isn't, but it sounds like a nice thing to add. However, I guess one thing you can do right now is:

rdd.mapPartitions(iter => Iterator(iter)).foreach(iter => {do stuff})

Kind of hacky, but it should work!

Matei

Alex Boisvert

unread,
Dec 27, 2012, 12:52:10 PM12/27/12
to spark...@googlegroups.com
FWIW, I've been playing with this approach in the past few days...  I ended up using "grouped" to batch inserts, e.g.,

val batchSize = 100
rdd mapPartitions (_ grouped batchSize) foreach { batch => ... }

alex

Jaka Jančar

unread,
Dec 27, 2012, 12:54:36 PM12/27/12
to spark...@googlegroups.com
Clever, thanks!

Matei Zaharia

unread,
Dec 27, 2012, 12:58:43 PM12/27/12
to spark...@googlegroups.com
Ah cool, this also works.

Matei

Jaka Jančar

unread,
Dec 27, 2012, 3:10:55 PM12/27/12
to spark...@googlegroups.com
My extended version with controllable parallelism, in case it's useful to anyone:

    val numParallelInserts = 10
    val batchSize = 1000
    
    new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
      val db = connect()
      
      val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
      val stmt = db.prepareStatement(sql)
      
      iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
        batch foreach { session =>
          stmt.setString(1, session.id)
          stmt.setString(2, TimestampFormat.print(session.ts))
          stmt.addBatch()
        }
        stmt.executeBatch()
        db.commit();
        logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
      }
      
      db.close();

Arindam Paul

unread,
Feb 19, 2013, 2:10:03 AM2/19/13
to spark...@googlegroups.com
This is an excellent discussion.
Just wanted to add one point. Do you think adding view in the below statement would improve the performance ? "When using a view the collection is only traversed when required so there is no performance loss."

iter.grouped(batchSize).view.zipWithIndex

Thanks,
Arindam.

Reynold Xin

unread,
Feb 19, 2013, 2:33:03 AM2/19/13
to spark...@googlegroups.com
Using view should improve performance slightly depending on how big the collection is.

Another way is to get rid of the closure and use a while loop. But that might be pre-mature optimization (using while loop).


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Arindam Paul

unread,
Feb 19, 2013, 4:02:27 AM2/19/13
to spark...@googlegroups.com
yes, size of the collection makes sense.

One more question in the same line. Does each of the Transformation operation exploit the capabilities of underlying multi core architecture of the Nodes by making the Transformation operations run in parallel. I am talking about .par operation in Scala. Or, is it left to the user ?

Reynold Xin

unread,
Feb 19, 2013, 7:49:15 PM2/19/13
to spark...@googlegroups.com
If you run in a cluster, they do exploit parallelism to fill out all the cores you allocate.

If you run in local mode with thread = 1, all executions are serial.

Note that in a mapPartitionsWithSplit, each transformation for an iterator of a partition is always single-threaded, unless you explicitly add a .par. It however does not really make sense to add the par because in the data parallel execution framework, it is probably better to explicitly parallelize the outer loop (i.e. number of partitions since partitions are shared-nothing), not within each partition.

Arindam Paul

unread,
Feb 19, 2013, 9:21:57 PM2/19/13
to spark...@googlegroups.com
Perfect. 
After going through your tuning document, I realized that having more task per core makes more sense than using .par. 
Since it is important to keep the input set of each task smaller, increasing the number of task seems ideal.
Reply all
Reply to author
Forward
0 new messages