"recover partitions" in spark

1,060 views
Skip to first unread message

Stephen Haberman

unread,
Dec 12, 2012, 1:30:50 AM12/12/12
to spark...@googlegroups.com
Hi,

We have some Hive code that I'd like to port to just Spark (not Shark) as, being a Scala shop, we like the idea of using the Scala DSL directly.

(If this seems like a horribly naive idea, like we're giving up amazing query optimizations in Shark that we could never hope to do by hand, please let me know.)

So, I have two questions, both about how to load data out of S3:

1) We store our files partitioned in S3 like bucket/path/year=XXXX/month=XX/day=XX, and then use Hive's "partitioned by" + "recover partitions" commands to load all of the log files.

Is there something similar in Spark's S3 integration (specifying S3-side partitions, not the regular RDD/HDFS partitions)?

Looking at Spark S3 docs, it says "you can also use a directory of files as input by simply giving the path to the directory"--so, if I just use:

    sc.textFile("s3n://bucket/path/year=2012")

Will I get all of the 2012 log files that match that prefix? Perhaps this is good enough, and I won't miss the lack of "recover partitions"?

2) Our log lines are formatted as key/value pairs, e.g. specified in Hive like:

row format delimited
  fields terminated by '\\004'
  collection items terminated by '\\001'
  map keys terminated by '\\002'

It looks like instead of sc.textFile, I could use:

    sp.hadoopFile[..., Map[String, String], OurMapInputFormat]("s3n://...")

And assuming I implement the Hadoop InputFormat correctly, I'll be able to create a Map for each line.

Apologies for the newbie questions--I think I have a good idea of how this will work, but wanted to ask in case I'm missing anything.

Thanks!

- Stephen

Matei Zaharia

unread,
Dec 12, 2012, 1:49:35 AM12/12/12
to spark...@googlegroups.com
Hi Stephen,

We have some Hive code that I'd like to port to just Spark (not Shark) as, being a Scala shop, we like the idea of using the Scala DSL directly.

(If this seems like a horribly naive idea, like we're giving up amazing query optimizations in Shark that we could never hope to do by hand, please let me know.)

It's not horrible at all, a lot of people have done this.

1) We store our files partitioned in S3 like bucket/path/year=XXXX/month=XX/day=XX, and then use Hive's "partitioned by" + "recover partitions" commands to load all of the log files.

Is there something similar in Spark's S3 integration (specifying S3-side partitions, not the regular RDD/HDFS partitions)?

Looking at Spark S3 docs, it says "you can also use a directory of files as input by simply giving the path to the directory"--so, if I just use:

    sc.textFile("s3n://bucket/path/year=2012")

Will I get all of the 2012 log files that match that prefix? Perhaps this is good enough, and I won't miss the lack of "recover partitions"?


Actually I'm not sure what recover partitions does -- does it just mean "parse the folder name to figure out the partition ID"? In that case you can do the folder method you mentioned, and you can also use comma-separated lists, such as


You might also be able to do wildcards, such as year=201*. This works with HDFS but I'm not positive that it works with s3n. (We use Hadoop's S3 access library so it will support whatever that supports). But even if wildcards don't work, maybe you can make a method that creates a comma-separated list like this.

2) Our log lines are formatted as key/value pairs, e.g. specified in Hive like:

row format delimited
  fields terminated by '\\004'
  collection items terminated by '\\001'
  map keys terminated by '\\002'

It looks like instead of sc.textFile, I could use:

    sp.hadoopFile[..., Map[String, String], OurMapInputFormat]("s3n://...")

And assuming I implement the Hadoop InputFormat correctly, I'll be able to create a Map for each line.

You can do that, but I believe this format you posted still has lines terminated by newlines (you can check, just load it as a textFile and do file.first in Spark for example). If that's so, you can split the strings on '\0001', etc. Otherwise you would indeed have to write an InputFormat similar to TextInputFormat but for this stuff.

Matei

Reynold Xin

unread,
Dec 12, 2012, 2:27:38 AM12/12/12
to spark...@googlegroups.com
Regarding the first point - no it is not crazy. Just be careful about memory usage. You might like to consider doing a similar columnar format to cache the tabular data, otherwise your in-memory data footprint might be bigger than you thought.

--
Reynold Xin | rxin.org | @rxin

Stephen Boesch

unread,
Dec 12, 2012, 2:31:51 AM12/12/12
to spark...@googlegroups.com
Hi Stephen,
  This post is interesting - which specific "Scala DSL" are you referring to in the OP?

stephenb


2012/12/11 Reynold Xin <rx...@cs.berkeley.edu>

Stephen Haberman

unread,
Dec 12, 2012, 3:41:37 AM12/12/12
to spark...@googlegroups.com

> It's not horrible at all, a lot of people have done this.

Good, I'm glad to hear that.

> Actually I'm not sure what recover partitions does -- does it just
> mean "parse the folder name to figure out the partition ID"?

Yeah--it looks like it's actually an Amazon extension:

http://docs.amazonwebservices.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-additional-features.html#emr-hive-recovering-partitions

Before this, our Hive scripts had to have a separate line for each
partition (alter table add partition X) they wanted to read.

The good thing was that, since our partition hints were year/month/day,
if our query included a date in its where clause (which was very
common), Hive could avoid pulling the other partitions from S3.

I don't think Spark works entirely the same way (?), because if I do
sc.textFile(s3n://bucket/path/), then a .filter { _.year >= 2012 },
Spark would have to pull in all of the data from S3 just to evaluate
"year >= 2012" (since it's unaware of the S3 partitioning scheme
encoded in the S3 path) and just end up throwing most of the data out.

AFAIK? I dunno, since Spark's RDDs know about partitions, can .filter
eval certain conditions without even loading the non-matching
partitions?

> s3n://bucket/path/year=2012,s3n://bucket/path/year=2011

Ah, cool, okay, I think this is what I want for now.

> If that's so, you can split the strings on '\0001', etc.

Ha, oh right, that would be a lot easier. Thanks!

Thanks for your help, Matei, I appreciate it.

- Stephen


Patrick Wendell

unread,
Dec 12, 2012, 1:08:42 PM12/12/12
to spark...@googlegroups.com
On Wed, Dec 12, 2012 at 12:41 AM, Stephen Haberman
<stephen....@gmail.com> wrote:
> I don't think Spark works entirely the same way (?), because if I do
> sc.textFile(s3n://bucket/path/), then a .filter { _.year >= 2012 },
> Spark would have to pull in all of the data from S3 just to evaluate
> "year >= 2012" (since it's unaware of the S3 partitioning scheme
> encoded in the S3 path) and just end up throwing most of the data out.

Hey Stephen,

Spark is aware of the block-based partitioning of HDFS files, but it
doesn't have information about the higher level date-partitioning you
are doing. That's why if you load the text file and then do a filter,
it can't "push down" the filter to determine what partition to read.

To exploit these partions directly in spark you'd need to add your own
partitioning logic, such as loading multiple more specific sc.textFile
paths, then taking the union of them. E.g.

val jan = sc.textFile("s3n://bucket/path/year=2012month=01")
val feb = sc.textFile("s3n://bucket/path/year=2012month=02")

val workingSet = jan.union(feb).cache() // or whatever

What Shark does is looks at your Hive metastore (the metastore
contains the partitioning information) and does this under the covers
for you automatically, i.e. it understands that a filter based on a
date should exploit the partitions.

One option you have is to load the data using Shark, then process it
using custom Spark jobs. There is a function sqlToRDD that will let
you enter a SQL query and load the data set into an RDD. This would
give you the partitioning "for free" and then you can still do
whatever custom processing you want.

There is an example here:
http://shark.cs.berkeley.edu/

- Patrick

Stephen Haberman

unread,
Dec 13, 2012, 1:34:07 PM12/13/12
to spark...@googlegroups.com

> val jan = sc.textFile("s3n://bucket/path/year=2012month=01")
> val feb = sc.textFile("s3n://bucket/path/year=2012month=02")
> val workingSet = jan.union(feb).cache() // or whatever

Okay, that makes sense.

> What Shark does is looks at your Hive metastore (the metastore
> contains the partitioning information) and does this under the covers
> for you automatically, i.e. it understands that a filter based on a
> date should exploit the partitions.

Cool, I was wondering if using Spark directly was going to give up
features like this. Which is fine, it's a trade-off.

Handling date-level partitioning by hand is pretty easy, in that most
of our reports have fairly static set of dates ("last month"/etc.) that
we could script together, and not have to deal with, say, dynamically
entered user queries. So I think we'll be okay for now.

> There is a function sqlToRDD that will let you enter a SQL query and load the
> data set into an RDD. This would give you the partitioning "for free" and then
> you can still do whatever custom processing you want.

Wow, that looks pretty nifty. I'll keep that in mind if our by-hand partitions
gets too bad.

Thanks!

- Stephen

Stephen Haberman

unread,
Dec 12, 2012, 11:14:12 AM12/12/12
to spark...@googlegroups.com

> This post is interesting - which specific "Scala DSL" are you
> referring to in the OP?

Hi, sorry, I just meant the regular Spark Scala syntax:

http://www.spark-project.org/docs/0.6.0/scala-programming-guide.html

- Stephen

Reply all
Reply to author
Forward
0 new messages