Hi Matt,
I know Nithin also responded, but I want to address your initial
questions directly and then see if I can add anything to the other
response. My answers are inline below.
On 02/26/2015 02:45 AM, Matt wrote:
> Hello,
>
> (this is a repost, because my previous post was not very concise and clear)
>
> I'm writing to a partitioned Dataset formatted as Parquet on a local file system and I'd like to know how to make sure that the written Parquet files come as close to, but do not exceed 128MB. To write the dataset, I'm pretty much following the example athttps://
github.com/kite-sdk/kite-examples/tree/master/dataset-staging. A notable difference is that I Shard.shard(staged_data, 1) in the Crunch pipeline, to make sure that all Avro files from a single day in the staged dataset are combined. If I don't shard, I end up with a Parquet file for every small-sized, i.e. <<128MB Avro file on a given day and that is not what I want.
Kite provides a utility to help you out with this in CrunchDatasets. I'm
not entirely familiar with Shard, but it looks like it does something
similar and rebalances the data with a reduce phase.
The reason why I recommend using our method, CrunchDatasets.partition is
that this method understands the partition strategy for the dataset and
will shuffle the data so that each partition is handled by a single
reducer. That minimizes the number of files and combines the input as
you want.
The drawback is that this doesn't cut the files off at 128MB...
> I thought Kite SDK would create 128MB Parquet files in the target dataset when the dataset descriptor property dfs.blocksize is set, but that appears to not be the case as a single, large Parquet file is written. Furthermore, the parquet.block.size setting, which I believe is 50 or 100MB by default, does not seem to have any effect either. I am not sure if my assumptions are wrong, or if the shard operation somehow interferes with Kite's workings.
This isn't entirely correct. The DFS block size is an independent
property that determines how the file will be split into HDFS blocks.
The parquet.block.size setting determines how large Parquet will make
row groups, but a file can (and should) have multiple row groups, as
well as multiple HDFS blocks.
You want row groups to fit entirely in the containing HDFS block for
reassembly, so the HDFS block size should be a whole-number multiple of
the Parquet block size.
You can find a full write-up of the Parquet row group option here:
http://ingest.tips/2015/01/31/parquet-row-group-size/
> I would greatly appreciate it if somebody can explain to me how to make sure the Kite SDK dataset is written as desired.
If you want to cut off files at a single row group, then you need to
calculate when to close the current writer (release the files) and start
a new writer. That's difficult right now, but we could add an option to
Kite's writers to have policy-based writes. For example, "close file at
X bytes" or "close files open more than Y seconds".
I'm happy to work with you if you need this option, though I think you
probably want to write the large Parquet files. (The only case I know of
where you don't want to write the large files is if you are reading them
with Impala, but that bug is going to be fixed.)
> p.s.: I have found that setting "mapreduce.input.fileinputformat.split.minsize" allows me to split large Avro files into near 128MB Parquet files iff I remove the shard operation, but this will not work because: (1) as previously mentioned, I am not dealing with large Avro files; and (2) even if I were, this approach will not create as few Parquet files as possible since split remnants are not combined.
This is an interesting approach, but your Parquet and Avro files might
not be the same size. I would expect Parquet files to be smaller than
the Avro files.
rb
--
Ryan Blue
Software Engineer
Cloudera, Inc.