Moving window in Scalding

204 views
Skip to first unread message

Dan Glaser

unread,
May 9, 2013, 4:24:28 PM5/9/13
to cascadi...@googlegroups.com
My problem: I have 10s of millions of rows of time series data that I need to run moving window calculations on (though the groupby separates it into thousands of groups of about 10000 rows).  In some cases EMA-like replacements work, but in other cases I need to do things like run multiple variable regressions and just need to use windows.

I've read the previous post on moving windows in scalding: https://groups.google.com/forum/#!msg/cascading-user/CTYOjlHs6xE/A5TNIMFM_ikJ and found it somewhat confusing.  I understand that I can use scanleft to walk through the data and can use an object to accumulate the window.  However, it seems like scanleft will save a copy of this object for each value so if my moving window needs, for example, 100 previous values, the naive implementation of an immutable object wrapping a queue would require 100x the space of the original dataset.

What I think that I want is something like a reference to a mutable moving window object parameterized by the data type, so that I'm not storing a copy of it at each stage and its generic enough to reuse for different types of data.  So for example:
.scanLeft( ('values, 'otherValues, 'moreValues) -> ('calculationResult, 'movingWindowObjectReference) ) ((Double.NaN, movingWindowObjectConstructor[(Double, Double, Double]))
{ (previousCalcs : (Double, movingWindowObjectReferenceType[Double, Double, Double]), currentValues : (Double, Double, Double) ) =>
  {
    val movingWindowObjectReference = previousCalcs._2
    movingWindowObjectReference.enqueueValues(currentValues)
    val calculationResult = Math.doSomeCalculations(movingWindowObjectReference.getWindowedValues)
    (calculationResult, movingWindowObjectReference)
  }
}

Can anyone offer suggestions for how to get this done space efficiently?  I'm also very open to alternative solutions or comments that I'm completely off base with my proposal.  It seems that the scala concept of "sliding" is roughly what I want here, but that doesn't seem to be present in Scalding.

Thanks in advance for any help!
-Dan

Oscar Boykin

unread,
May 10, 2013, 2:21:15 PM5/10/13
to cascadi...@googlegroups.com
There is an immutable queue in scala that does not make a full copy when you add or remove (there a few approaches here using vector or list for instance), and I would doubt this could be a bottleneck anyway, so I wouldn't give up the benefit of immutability unless profiling really pointed to it.

BUT you can use a mutable queue if you want with a scanLeft, there is no problem with doing so if you know multiple threads won't touch it.

I''d start with the immutable queue:

and do what you described.


-Dan

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Oscar Boykin

unread,
May 10, 2013, 2:24:49 PM5/10/13
to cascadi...@googlegroups.com
Oh....

I get you concern, sorry.

You don't want to materialize that queue for each value...

You follow up the scanLeft with a discard or project to delete the queue and that will happen before it reaches disk.

Alternately, you use mapStream and supply a function of Iterator to Iterator (that implements this scanLeft + flatMap to discard the data you don't want).

Weremook

unread,
May 12, 2013, 5:05:50 PM5/12/13
to cascadi...@googlegroups.com
 Remember that you will be doing the scanLeft on each group, as each group is processed you can free resources. Also, check if there is an online version of the algorithm you need.

Yueyu Fu

unread,
May 9, 2016, 5:59:11 PM5/9/16
to cascading-user
I ran into a similar problem and understood most of the solution. One question remains for me is: how do you control the sliding window size? Thanks.
Reply all
Reply to author
Forward
0 new messages