Spark accumulators - accessing value directly

2,790 views
Skip to first unread message

Austin Gibbons

unread,
Jul 3, 2012, 11:50:23 AM7/3/12
to spark...@googlegroups.com
Hi!

The Spark documentation says this about accumulators:
"Tasks running on the cluster can then add to it using the += operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its valuemethod."

I want an accumulator that is a hashmap of hashmaps (string -> (string -> int)), effectively counting (string, string) occurrences in a dataset.
So what I would like is to call accumulator.update((string, string)). However, addInPlace only allows me to take in the same datatype as my accumulator.
Should I :

(1) encapsulate two types into one object (namely, a hashmap and a pair of strings) and only initialize the appropriate structure
(2) access the hashmap.value within the accumulation step
(3) Something else

===== anonymized code ======
<code>
class AccumulatorType (x : String = null, y : String = null) {
  //encapsulates the accumulator
  val hashMap = new HashMap[String, HashMap[String, Int]]
  //encapsulates an element in the accumulator
  val my_x = x
  val my_y = y
}

//elsewhere...
def addInPlace(a : AccumulatorType, b : AccumulatorType) {
  if (a.my_x == null) {
    //combine two hashmaps
  } else {
     //add element to hashmap
  }
}
</code>

Matei Zaharia

unread,
Jul 3, 2012, 4:09:52 PM7/3/12
to spark...@googlegroups.com
Hi Austin,

Good question. The way I would do this is to wrap each element you add in a small hashmap, and have a function for merging two hashmaps. However, maybe we should change the accumulator API in the future to allow the type added and the type of the result to be different.

By the way, instead of a HashMap[String, HashMap[String, Int]], why not use a HashMap[(String, String), Int]? I think that would be more idiomatic and slightly more efficient. The pair class in Scala (which (String, String) represents) already supports hashCode and equality based on the elements in it, so it works fine as a hashtable key.

Also, just so you know, one alternative to accumulators is to use reduce(), or in this case, reduceByKey(). You can do a reduceByKey followed by collect to get the result back at the master as an array of (key, value) pairs. Again your keys would probably be pairs of strings.

Matei

Austin Gibbons

unread,
Jul 3, 2012, 5:23:09 PM7/3/12
to spark...@googlegroups.com
Thanks for the response.

My main concern with creating a tiny hashmap is it will be slower.

I have a hash of hashes because thats the top of my data (ie, its like baseball->(whitesox->3, giants->6), basketball->(bulls->15, kings->12)). 

I will look into reduce, thank you for the suggestion.

Can you comment accessing accumulator.value? Is it a good thing, a bad thing, or neither good nor bad?

Thanks again,
Austin

Matei Zaharia

unread,
Jul 3, 2012, 5:48:18 PM7/3/12
to spark...@googlegroups.com
You shouldn't try to access .value in tasks on a cluster, because it won't be synchronized across nodes. I believe it throws an error if you try to do this today. We could potentially have it mean "give me just the value created so far in this thread", but I think this would be confusing for other programmers who read the code.

If you want to avoid creating lots of small objects, the best way is probably to use reduceByKey, which maintains a single hash table for you. Alternatively, you can use mapPartitions, which lets you see all of the elements in one partition of the RDD as an Iterator instead of having your function called once per element. Inside your mapPartitions call you could build your own hash table in whichever way you prefer. However, reduceByKey is meant to do this in the case where you just aggregate values by key.

Matei

Imran Rashid

unread,
Jul 11, 2012, 2:29:01 PM7/11/12
to spark...@googlegroups.com
Hi,

I have more follow up questions related to this use of accumulators.

If I understand correctly, reduceByKey will work well when there is one type of thing you're accumulating.  However, if you want to groups things in different ways, then you need to do multiple passes over the data, right?  Eg., imagine that your processing documents from wikipedia, and you want to simultaenously get:
* distribution of words per document
* sets of a special type of extracted entity (eg., names of locations that score high on some custom function)
* total count of citations

I guess you can get around this by making special keys that also indicate the type of aggregation you're doing, but thats a real pain.  But one nice thing about spark accumulators is that in one pass, you can simultaenously accumulate all of them.  However, using addInPlace is a bit ugly right now, because you've got add a whole accumulator object, not just the value you're adding (also causing object churn & gc).

We could potentially use mapPartitions in this case, and just update the accumulator at the end of the partition, but that's not nearly as clean.

What do you think about adding either / both:
* localValue, which you can call from a task
* addInPlace(X,Y) to a subclass of AccumulatorParam

I can try adding both of those and submit a patch, if you think they'd be useful.


On a slightly related topic -- I was wondering about how you'd use Spark for stochastic gradient descent.  (If I understand correctly, the SparkLR example is batch gradient descent.)  I think to do, you'd also need to use mapPartitions with the current API, right?  It was mentioned earlier on the mailing list, but it wasn't obvious to me what the implementation would look like. 
https://groups.google.com/forum/?fromgroups#!searchin/spark-users/stochastic/spark-users/5tmLY4JvjDI/eejlS9m80tUJ
these additions to accumulators might simplify that as well.

thanks,
Imran

Matei Zaharia

unread,
Jul 12, 2012, 1:38:00 PM7/12/12
to spark...@googlegroups.com
Hi Imran,

Just saw your pull request on Accumulable. I like the idea of extending the interface to minimize object churn. I'll get back to you with some comments on the code later, but I think it's the best solution for this use case.

> If I understand correctly, reduceByKey will work well when there is one type of thing you're accumulating. However, if you want to groups things in different ways, then you need to do multiple passes over the data, right? Eg., imagine that your processing documents from wikipedia, and you want to simultaenously get:
> * distribution of words per document
> * sets of a special type of extracted entity (eg., names of locations that score high on some custom function)
> * total count of citations

Yup, this is kind of annoying right now. In the longer term I want to add some way to batch multiple actions and automatically launch a job with all of them. Something like:

rdd.reduceLater(reduceFunction1) // returns Future[ResultType1]
rdd.reduceLater(reduceFunction2) // returns Future[ResultType2]
SparkContext.force() // executes all the "later" operations as part of a single optimized job

However, in the shorter term, I think accumulators are the easiest way to do this, especially with the fixes you have to reduce GC.

> On a slightly related topic -- I was wondering about how you'd use Spark for stochastic gradient descent. (If I understand correctly, the SparkLR example is batch gradient descent.) I think to do, you'd also need to use mapPartitions with the current API, right? It was mentioned earlier on the mailing list, but it wasn't obvious to me what the implementation would look like.
> https://groups.google.com/forum/?fromgroups#!searchin/spark-users/stochastic/spark-users/5tmLY4JvjDI/eejlS9m80tUJ
> these additions to accumulators might simplify that as well.

Yes, the applications we've had that used it used mapPartitions. It would look roughly like this:

val points = // RDD of Point objects

val gradient = points.mapPartitions { partitionPoints =>
val localGradient = new Vector(…)
for (point <- partitionPoints) {
localGradient += // math for p
}
Iterator(localGradient) // return an iterator that gives just one number
}.reduce(_ + _) // add up the results of all the partitions

That is, you take an RDD[Point] into an RDD[Vector] with one vector per partition (the local gradient), and then you call reduce on that to do the final sum.

Unfortunately I don't have any sample code that does this right now, but if you try it out, it would be cool to add it into the examples folder.

Matei

Imran Rashid

unread,
Jul 12, 2012, 2:22:50 PM7/12/12
to spark...@googlegroups.com
HI Matei, thanks for the feedback!


On Thursday, July 12, 2012 10:38:00 AM UTC-7, Matei Zaharia wrote:
Hi Imran,

Just saw your pull request on Accumulable. I like the idea of extending the interface to minimize object churn. I'll get back to you with some comments on the code later, but I think it's the best solution for this use case.

Thanks for the comments, I'll work on making those changes.
 
> If I understand correctly, reduceByKey will work well when there is one type of thing you're accumulating.  However, if you want to groups things in different ways, then you need to do multiple passes over the data, right?  Eg., imagine that your processing documents from wikipedia, and you want to simultaenously get:
> * distribution of words per document
> * sets of a special type of extracted entity (eg., names of locations that score high on some custom function)
> * total count of citations

Yup, this is kind of annoying right now. In the longer term I want to add some way to batch multiple actions and automatically launch a job with all of them. Something like:

rdd.reduceLater(reduceFunction1) // returns Future[ResultType1]
rdd.reduceLater(reduceFunction2) // returns Future[ResultType2]
SparkContext.force() // executes all the "later" operations as part of a single optimized job

However, in the shorter term, I think accumulators are the easiest way to do this, especially with the fixes you have to reduce GC.

that sounds pretty cool, that would be a nice addition.
 
> On a slightly related topic -- I was wondering about how you'd use Spark for stochastic gradient descent.  (If I understand correctly, the SparkLR example is batch gradient descent.)  I think to do, you'd also need to use mapPartitions with the current API, right?  It was mentioned earlier on the mailing list, but it wasn't obvious to me what the implementation would look like.  
> https://groups.google.com/forum/?fromgroups#!searchin/spark-users/stochastic/spark-users/5tmLY4JvjDI/eejlS9m80tUJ
> these additions to accumulators might simplify that as well.

Yes, the applications we've had that used it used mapPartitions. It would look roughly like this:
...
Unfortunately I don't have any sample code that does this right now, but if you try it out, it would be cool to add it into the examples folder.


The test case I added also has an example of doing SGD, but using accumulators.  I find that version more intuitive -- and again, it has the advantage of being able to accumulate multiple different types of things simultaneously.  Eg., I can imagine cases where you simultaneously want to compute a fit, get metrics on fit quality, and maybe even save outlier data points.

However, doing this requires accessing accumulator.value.  The only thing the api currently prohibits is assignment to value.  To me, this is a pretty compelling reason to allow accessing the value.  Other than perhaps being confusing to the developer (b/c its only the local value), is there a reason to not allow access to the local value?  (eg., perhaps another implementation would not have any access to the local value at all)  If the only concern is the clarity, I could add accumulator.localValue, and change accumulator.value to throw an exception when accessed by a task.

thanks,
imran

Matei Zaharia

unread,
Jul 13, 2012, 1:24:17 PM7/13/12
to spark...@googlegroups.com
> The test case I added also has an example of doing SGD, but using accumulators. I find that version more intuitive -- and again, it has the advantage of being able to accumulate multiple different types of things simultaneously. Eg., I can imagine cases where you simultaneously want to compute a fit, get metrics on fit quality, and maybe even save outlier data points.
>
> However, doing this requires accessing accumulator.value. The only thing the api currently prohibits is assignment to value. To me, this is a pretty compelling reason to allow accessing the value. Other than perhaps being confusing to the developer (b/c its only the local value), is there a reason to not allow access to the local value? (eg., perhaps another implementation would not have any access to the local value at all) If the only concern is the clarity, I could add accumulator.localValue, and change accumulator.value to throw an exception when accessed by a task.

Ah, I see what you mean. Instead of doing this, in our examples so far we've had a separate variable for the actual parameter you're computing and the gradient. So something like this:

val param = new Vector(…)
for (i <- 0 until iterations) {
val gradient = spark.accumulator(…)
data.foreach(p => gradient += …)
param += gradient
}

I see where you're coming from though, in wanting to use the accumulator to represent both the current parameter and the changes you make. How would you want it to work though? If you do += do you see the updated value, or do you want an "oldValue" call that gives you the original value when the iteration started?

Matei

Imran Rashid

unread,
Jul 15, 2012, 1:00:57 AM7/15/12
to spark...@googlegroups.com

I thought that += should immediatley modify the variable, so that for the next data point (in the same node & thread), you see the updated value. You'd only see the local updates, but that is often OK.

SGD is a bit tricky, though, you can't have your accumulator just be the parameter value.  You don't want to sum up the parameter across all partitions -- you want to sum up the *updates* of the parameter from all the partitions, and add that to the parameter value from the start of the iteration.  So to do that, my accumulator is just the updates, and for each point, I have to add the update to the initial value of parameter.  That's why I added the plusDot method to Vector -- for every point, I need to take the sum of the initial value of the parameter plus the current updated value.

Instead of having to add the two vectors together for every point, it would be nice if you could just store the initial value and the sum, and then before the accumulator is serialized, there is a some function called which can then take the difference between the sum & the initial value, (eg, prepForSerialize, which would be a no-op by default).  It would be easy to add, but maybe an unnecessary optimization ...

I dunno if I have right the mental model, though, for accumulators.  I think of them as having these semantics:

* They have an initial value which is broadcast to all workers
* Workers have access to the *local* value of the accumulator, and can do anything they want with it: read it, modify it, etc.
* The local value of each worker is "saved" by the system.  The local values will always get saved when a worker has finished, but it may get saved at any time in the middle of a worker's execution as well, at the system's discretion.  If the value gets saved, the accumulator is then replaced by the initial value again.
* All of the saved values from all workers get merged together.  Merging must be a commutative & associative operation -- the exact order is up to the system.

This is pretty much the current behavior of accumulators.  But maybe I am abusing them -- should there really be another shared variable type with these properties?  Or is it a bad idea to have shared variables like this?  I feel like this is the exact set of properties I want for a lot of machine learning tasks.

thanks,
imran
Reply all
Reply to author
Forward
0 new messages