0) I'm not a huge fan of windowed moving averages for exactly the
memory nature they present. In many cases, a memoryless averaging
(exponential moving average) is as useful, but much easier to
implement.
1) Yes you can use any object as an accumulator. If Kryo (the library
used to serialize) has problems, you might have to supply a custom
serializer. This is possible but very rarely needed. If your object
doesn't do IO, it should be fine.
2) Not all data is given to the foldLeft (or the scanLeft, which is
what I think you really want). It gives it one sample at a time. If
you want to accumulate in memory, your code must do that. If you know
the result is going to be small enough to fit in ram, you can use
.toList in GroupBuilder, and then use a map function after to operate
on the whole list, but this approach is not scalable to billions or
trillions of data points.
3) Let me give it a try below. Note, scanLeft is just like a foldLeft
except you keep all the intermediate outputs. I think this is what
you want to do. After, you can clean up with a map/filter/flatMap,
which since Chris is such a bad-ass coder, will be glued automatically
into the scanLeft and no disk will be harmed (unless absolutely
needed).
// This class is immutable:
class WindowMemory(memorySize : Int) {
def currentValue : Double
def update(date : String, sample : Double) : WindowMemory // returns
a NEW window memory for the next position
}
myPipe.groupBy('category) { grouping =>
grouping
.sortBy('date)
.scanLeft(('date, 'val) -> 'state) { (input : (String,Double),
oldState : WindowMemory) =>
val (dateStr, thisVal) = input
oldState.update(dateStr, thisVal)
}
}
//Now map to get all the values out:
.map('state -> 'movingStat) { (state : WindowMemory) => state.currentValue }
// We don't need the state object anymore:
.discard('state)
Does this help?
> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>
--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco
> To post to this group, send email to cascading-user@googlegroups.com.
> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>
Glad to see the question.
0) I'm not a huge fan of windowed moving averages for exactly the
memory nature they present. In many cases, a memoryless averaging
(exponential moving average) is as useful, but much easier to
implement.
We want to weight data at time t in the past with weight exp(- a t). Assume we have data that looks like: (category, time, value). Let's assume time is a double (seconds since epoch or something):
pipe.groupBy('category) { grouping =>
grouping.sortBy('time)
.scanLeft(('time, 'value) -> ('time, 'expMA))((0.0, 0.0)) { ( timeAndValue : (Double, Double), oldAve : (Double, Double)) =>
val (oldTime, oldAve) = oldAve
val (thisTime, thisValue) = timeAndValue
val delta = (thisTime - oldTime)
val weight = scala.math.exp( - a*delta)
val newAve = weight * oldAve + thisValue
(thisTime, newAve)
}
}
//Now you have (category, time, expMA) as the data.
--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco
> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/XVGoOMljWrkJ.
> To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
Thanks again.