Moving Window Calculation (via Scalding)

868 views
Skip to first unread message

Nathan Stults

unread,
Mar 13, 2012, 1:48:52 PM3/13/12
to cascading-user
Hello,

I'm pretty new to Cascading, Hadoop and Scala, and I'm having trouble
imagining how to accomplish a sliding window calculation such as a
simple moving average. My data is not huge.

What I want to do is take an time series like this, where a row has
already been aggregated down to one entry per day per category

Date Category Val
1/1/1 A 3
1/1/2 B 2
1/1/2 A 4
...

And accumulate a moving average using the previous 30 days of data

Date Category Val Avg
1/1/1 A 3 x.x
1/1/2 A 3 x.x
1/12 B 3 x.x
...

I think foldLeft of the GroupBuilder will help me, but I'm not sure
how to structure the code. What seems like it should work is to have a
MovingAverage class as the accumulator that manages the sliding window
and produces an average for each successive row, but

1. I'm not sure I can use a regular object as the accumulator instead
of a primitive?
2. Will the full set of data be presented to the foldLeft operation
(for a given category) at one time?
3. I'm struggling with how to structure the call

I'm sure #1 and #2 have easy answers, but I'd love to see a simple
sample call to foldLeft so that I can understand the syntax of the
call if anyone can afford the time as my Scala is still pretty weak.
I'm guessing something like this

mypipe.groupBy('category) { g =>
g.foldLeft( ('date, 'val) -> 'avg )( new MovingAverage ) {
args: ((String, Int), MovingAverage) =>
args._2.addPoint( args._1._1, args._1._2 )
}
}

any help would be greatly appreciated.

Thanks,

Nathan




Oscar Boykin

unread,
Mar 13, 2012, 2:18:57 PM3/13/12
to cascadi...@googlegroups.com
Glad to see the question.

0) I'm not a huge fan of windowed moving averages for exactly the
memory nature they present. In many cases, a memoryless averaging
(exponential moving average) is as useful, but much easier to
implement.

1) Yes you can use any object as an accumulator. If Kryo (the library
used to serialize) has problems, you might have to supply a custom
serializer. This is possible but very rarely needed. If your object
doesn't do IO, it should be fine.

2) Not all data is given to the foldLeft (or the scanLeft, which is
what I think you really want). It gives it one sample at a time. If
you want to accumulate in memory, your code must do that. If you know
the result is going to be small enough to fit in ram, you can use
.toList in GroupBuilder, and then use a map function after to operate
on the whole list, but this approach is not scalable to billions or
trillions of data points.

3) Let me give it a try below. Note, scanLeft is just like a foldLeft
except you keep all the intermediate outputs. I think this is what
you want to do. After, you can clean up with a map/filter/flatMap,
which since Chris is such a bad-ass coder, will be glued automatically
into the scanLeft and no disk will be harmed (unless absolutely
needed).

// This class is immutable:
class WindowMemory(memorySize : Int) {
def currentValue : Double
def update(date : String, sample : Double) : WindowMemory // returns
a NEW window memory for the next position
}

myPipe.groupBy('category) { grouping =>
grouping
.sortBy('date)
.scanLeft(('date, 'val) -> 'state) { (input : (String,Double),
oldState : WindowMemory) =>
val (dateStr, thisVal) = input
oldState.update(dateStr, thisVal)
}
}
//Now map to get all the values out:
.map('state -> 'movingStat) { (state : WindowMemory) => state.currentValue }
// We don't need the state object anymore:
.discard('state)

Does this help?

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco

Nathan Stults

unread,
Mar 13, 2012, 3:13:51 PM3/13/12
to cascadi...@googlegroups.com
Wow that is tremendously helpful, thank you. Since you mention it though, I'd just as soon use an EWMA as a windowed version regardless of memory or scalability, I just am not sure what the implementation would look like? I guess if the scaling factor is calculated directly from the date, you could just map and do a normal average?

def calculateWeight(date : Long)  = ...

mydata.map(('date, 'value) -> 'weighted_val){ args:(Long, Int) =>
    val (date, value) = args
    value * calculateWeight(date)
}.groupBy('category){ g => g.average('weighted_val) }

Is it that simple?

Thanks again for your help, I really appreciate it.

Nathan 


> To post to this group, send email to cascading-user@googlegroups.com.
> To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com.


> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

Nathan Stults

unread,
Mar 13, 2012, 3:16:01 PM3/13/12
to cascadi...@googlegroups.com
Doh - that obviously just outputs an average by group, not by day - so there is still an important piece I am missing :) 

Ted Dunning

unread,
Mar 13, 2012, 4:14:16 PM3/13/12
to cascadi...@googlegroups.com
On Tue, Mar 13, 2012 at 1:18 PM, Oscar Boykin <os...@twitter.com> wrote:
Glad to see the question.

0) I'm not a huge fan of windowed moving averages for exactly the
memory nature they present.  In many cases, a memoryless averaging
(exponential moving average) is as useful, but much easier to
implement.

You can also do exact online mean computations or approximate online medians.  These can be for specific time periods as well.

Mahout has an implementation of this that should be quite suitable.
 

Oscar Boykin

unread,
Mar 13, 2012, 4:17:24 PM3/13/12
to cascadi...@googlegroups.com
You'd still need a scanLeft (buffer) to do an exponential moving average. Here's how:

We want to weight data at time t in the past with weight exp(- a t). Assume we have data that looks like: (category, time, value). Let's assume time is a double (seconds since epoch or something):

pipe.groupBy('category) { grouping =>
grouping.sortBy('time)
.scanLeft(('time, 'value) -> ('time, 'expMA))((0.0, 0.0)) { ( timeAndValue : (Double, Double), oldAve : (Double, Double)) =>
val (oldTime, oldAve) = oldAve
val (thisTime, thisValue) = timeAndValue
val delta = (thisTime - oldTime)
val weight = scala.math.exp( - a*delta)
val newAve = weight * oldAve + thisValue
(thisTime, newAve)
}
}
//Now you have (category, time, expMA) as the data.

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.

> To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/XVGoOMljWrkJ.
> To post to this group, send email to cascadi...@googlegroups.com (mailto:cascadi...@googlegroups.com).
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com (mailto:cascading-use...@googlegroups.com).

Ted Dunning

unread,
Mar 13, 2012, 4:23:49 PM3/13/12
to cascadi...@googlegroups.com

Nathan Stults

unread,
Mar 13, 2012, 5:21:20 PM3/13/12
to cascadi...@googlegroups.com
Thanks guys - incredibly helpful.

Nathan Stults

unread,
Mar 16, 2012, 2:12:48 AM3/16/12
to cascadi...@googlegroups.com
This worked well for me. If others want to try it out, note the args to the scanLeft function are reversed - they are (output, input) => ...

Thanks again.

Dan Glaser

unread,
May 10, 2013, 11:21:34 AM5/10/13
to cascadi...@googlegroups.com
I've posted a follow up question where I believe that I really need to do a moving window rather than an EMA: https://groups.google.com/forum/?fromgroups=#!topic/cascading-user/18LZVSNrRvo

I think that I mostly understand what you propose, but some of the details seem to be eluding me, such as how to implement the object that acts as an accumulator.  Immutable class wrapping immutable queue? How is update implemented and why is currentValue a single value rather than the whole window returned?  Is this because you are doing the calculation in the WindowMemory?

Thanks for any help that anyone can offer!
-Dan
Reply all
Reply to author
Forward
0 new messages