I'm reading a partitioned table of scores, one partition per day
base
base/20120628/
=> id1 score1a
id2 score 2a
...
base/20120629
=> id1 score1b
id2 score2b
...
I want to assemble a time series of scores under each id, like this:
DList[(ID, (String, Souble))] = fromList((id1, List(("20120628",score1a), ("20120629", score1b)...), (id2, List(...)))
I write the following code, which goes through many steps:
object FetchTS extends ScoobiApp {
def run() {
val (base, fromDate, toDate, out) = (args(0), args(1), args(2), args(3))
println("generating timeseries in %s from %s to %s, saving to %s".format(base, fromDate, toDate, out))
val tlist: List[DList[(ID, (String, Double))]] = ymdRange(fromDate, toDate).map {
case d => val path = "%s/%s".format(base, d)
fromDelimitedTextFile(path, sep = ctrlA) {
case ALong(kid) :: ADouble(score) :: _ => (kid, (d, score))
}
}
val ts: DList[(ID, (String, Double))] = tlist.reduce(_++_)
val pts = ts.groupByKey.map { case (kid, dscores) =>
val ds = dscores.toList.sortBy(_._1)
val pds = ds.map {case (x,y) => x + "," + y}.mkString(" ")
(kid, pds)
}
persist(toDelimitedTextFile(pts, out))
}
}
FYI, ymdRange is
import org.joda.time._
import org.joda.time.format._
class Dates(fromDate: LocalDate, toDate: LocalDate) extends Iterator[LocalDate] {
var d = fromDate
override def hasNext = d.isBefore(toDate)
override def next = { d = d.plusDays(1); d}
}
object Format {
val ymd = DateTimeFormat.forPattern("YYYYMMdd")
def ymdDate(s: String) = ymd.parseLocalDate(s)
}
import Format._
object Dates {
def dateRange(start: String, finish: String): List[LocalDate] = {
val fromDate = ymdDate(start)
val toDate = ymdDate(finish)
val d = new Dates(fromDate, toDate)
d.toList
}
def ymdRange(start: String, finish: String): List[String] = {
dateRange(start, finish) map ymd.print
}
}
-- is there a shorter and faster way to achieve this? Hive takes 3 hours on sorting so clearly this is not trivial, but surely there is a simpler way...
A+