reading a partitioned table

27 views
Skip to first unread message

Alexy Khrabrov

unread,
Jul 9, 2012, 11:44:27 PM7/9/12
to scoobi...@googlegroups.com
I'm reading a partitioned table of scores, one partition per day

base
base/20120628/ 
=> id1 score1a
    id2 score 2a
    ...
base/20120629
=> id1 score1b
    id2 score2b
    ...

I want to assemble a time series of scores under each id, like this:

DList[(ID, (String, Souble))] = fromList((id1, List(("20120628",score1a), ("20120629", score1b)...), (id2, List(...)))

I write the following code, which goes through many steps:


object FetchTS extends ScoobiApp {
  def run() {

    val (base, fromDate, toDate, out) = (args(0), args(1), args(2), args(3))

    println("generating timeseries in %s from %s to %s, saving to %s".format(base, fromDate, toDate, out))

    val tlist: List[DList[(ID, (String, Double))]] = ymdRange(fromDate, toDate).map {
      case d => val path = "%s/%s".format(base, d)
      fromDelimitedTextFile(path, sep = ctrlA) {
        case ALong(kid) :: ADouble(score) :: _ => (kid, (d, score))
      }
    }

    val ts: DList[(ID, (String, Double))] = tlist.reduce(_++_)

    val pts = ts.groupByKey.map { case (kid, dscores) =>
      val ds = dscores.toList.sortBy(_._1)
      val pds = ds.map {case (x,y) => x + "," + y}.mkString(" ")
      (kid, pds)
    }

    persist(toDelimitedTextFile(pts, out))
  }
}


FYI, ymdRange is

import org.joda.time._
import org.joda.time.format._

class Dates(fromDate: LocalDate, toDate: LocalDate) extends Iterator[LocalDate] {
  var d = fromDate
  override def hasNext = d.isBefore(toDate)
  override def next = { d = d.plusDays(1); d}
}

object Format {
  val ymd = DateTimeFormat.forPattern("YYYYMMdd")
  def ymdDate(s: String) = ymd.parseLocalDate(s)
}
import Format._

object Dates {
  def dateRange(start: String, finish: String): List[LocalDate] = {
    val fromDate = ymdDate(start)
    val toDate   = ymdDate(finish)
    val d = new Dates(fromDate, toDate)
    d.toList
  }

  def ymdRange(start: String, finish: String): List[String] = {
    dateRange(start, finish) map ymd.print
  }
}

-- is there a shorter and faster way to achieve this?  Hive takes 3 hours on sorting so clearly this is not trivial, but surely there is a simpler way...

A+

Eric Springer

unread,
Jul 10, 2012, 12:46:34 AM7/10/12
to scoobi...@googlegroups.com
On Tue, Jul 10, 2012 at 1:44 PM, Alexy Khrabrov <al...@scalable.pro> wrote:
>
> -- is there a shorter and faster way to achieve this? Hive takes 3 hours on
> sorting so clearly this is not trivial, but surely there is a simpler way...
>
> A+


G'day,

How long is the scoobi job taking?

You might want to try using a grouping:

http://nicta.github.com/scoobi/api/master/index.html#com.nicta.scoobi.Grouping

Which the .groupByKey will implicitly pick up (we really need to make
this explicit, but that's on the TODO). This will allow you to make
sure that the values arrive to each reducer in proper order, and you
don't need to do that .toList.sort


More info here:
http://nicta.github.com/scoobi/guide-SNAPSHOT/guide/Grouping.html#Secondary+sort

I'm not sure how much faster this will make things, but it would be
interesting to hear the numbers.


Regards,
Eric

Alexy Khrabrov

unread,
Jul 10, 2012, 4:24:18 PM7/10/12
to scoobi...@googlegroups.com
Alas, the job never finished because of a jobconf limit exceeded.  But I replaced it with a rolling daily, where the new score is simply concatenated onto the combined string of scores.

A+

On Monday, July 9, 2012 9:46:34 PM UTC-7, Eric Springer wrote:
Reply all
Reply to author
Forward
0 new messages