spark for distributing side-effects?

207 views
Skip to first unread message

Alex Boisvert

unread,
Dec 21, 2012, 11:59:51 AM12/21/12
to spark...@googlegroups.com
I'm wondering if Spark is suitable for distributing work that's primarily meant to be side-effecting.  My simplified use-case would be to spin up, say N instances, and have them all load data into a database (in my case, DynamoDB).  In pseudo-code,

val input = sc.textfile(...) ++ ...
val acc = sc.accumulator(...)
input foreach { item =>
  acc += insertIntoDB(item)
}

(BTW, are there significant differences between using foreach() with accumulators versus map/flatMap() to collect stats and potential errors about side-effecting operations?  I'm wondering if there's more to it than coding style...  though I guess map/flatMap() are assumed to be safe/non-side-effecting in case the RDD needs to be re-played).

What I mean by "suitable" is whether it's possible to easily control the level of parallelism on each worker nodes, e.g. depending on the instance type (m1.small), I'd like to use, say, 20 threads on each worker node.    I've read the documentation about setting the level of parallelism but it's unclear whether I can control it for `foreach` which I assume runs within the map phase.... does Spark pick the parallelism based on data size for `foreach`?  How can I override this?

thanks!
alex

rxin

unread,
Dec 21, 2012, 7:29:44 PM12/21/12
to spark...@googlegroups.com
Technically, you want your external operations to be idempotent. 

But if you don't care about idempotency (e.g. you assume operations won't fail and you don't need to launch speculative tasks), you can certainly do it the way you proposed.

For the example code you gave, if the input files are large, Spark by default will use the maximum degree of parallelism.

Alex Boisvert

unread,
Dec 21, 2012, 7:43:51 PM12/21/12
to spark...@googlegroups.com
On Fri, Dec 21, 2012 at 4:29 PM, rxin <rx...@cs.berkeley.edu> wrote:
Technically, you want your external operations to be idempotent. 

Yes, they are.
 
But if you don't care about idempotency (e.g. you assume operations won't fail and you don't need to launch speculative tasks), you can certainly do it the way you proposed.

I'll probably catch exceptions, put failure information in the accumulator (or a flatMap'ed result), and retry them later.

For the example code you gave, if the input files are large, Spark by default will use the maximum degree of parallelism.

Yeah so that's really the sticking point... my operations involve blocking calls with potentially significant network latency and I want to make sure the degree of parallelism is high enough regardless of the size of the input size.    Ideally, I'd like to either control the number of threads / parallelism or provide hints of blocking operations similar in spirit to JDK7 Fork-Join ManagedBlocker

alex

Reynold Xin

unread,
Dec 21, 2012, 7:45:42 PM12/21/12
to spark...@googlegroups.com
The other option is you can use mapPartitions, and then create threads yourself.

e.g.

sc.textFiles("...").mapPartitions { iter =>
  // do crazy things here with iterators
}

--
Reynold Xin | rxin.org | @rxin

Alex Boisvert

unread,
Dec 21, 2012, 7:56:02 PM12/21/12
to spark...@googlegroups.com
Oh, interesting.   I'll experiment with that.
Reply all
Reply to author
Forward
0 new messages