Question: how to ensure that files in a Kite dataset partition do not exceed dfs.blocksize?

109 views
Skip to first unread message

Matt

unread,
Feb 26, 2015, 5:45:50 AM2/26/15
to cdk...@cloudera.org
Hello,

(this is a repost, because my previous post was not very concise and clear)

I'm writing to a partitioned Dataset formatted as Parquet on a local file system and I'd like to know how to make sure that the written Parquet files come as close to, but do not exceed 128MB. To write the dataset, I'm pretty much following the example at https://github.com/kite-sdk/kite-examples/tree/master/dataset-staging. A notable difference is that I Shard.shard(staged_data, 1) in the Crunch pipeline, to make sure that all Avro files from a single day in the staged dataset are combined.  If I don't shard, I end up with a Parquet file for every small-sized, i.e. <<128MB Avro file on a given day and that is not what I want.

I thought Kite SDK would create 128MB Parquet files in the target dataset when the dataset descriptor property dfs.blocksize is set, but that appears to not be the case as a single, large Parquet file is written. Furthermore, the parquet.block.size setting, which I believe is 50 or 100MB by default, does not seem to have any effect either. I am not sure if my assumptions are wrong, or if the shard operation somehow interferes with Kite's workings.

I would greatly appreciate it if somebody can explain to me how to make sure the Kite SDK dataset is written as desired.

p.s.: I have found that setting "mapreduce.input.fileinputformat.split.minsize" allows me to split large Avro files into near 128MB Parquet files iff I remove the shard operation, but this will not work because: (1) as previously mentioned, I am not dealing with large Avro files; and (2) even if I were, this approach will not create as few Parquet files as possible since split remnants are not combined. 

Nithin Asokan

unread,
Feb 26, 2015, 10:57:58 AM2/26/15
to cdk...@cloudera.org
A notable difference is that I Shard.shard(staged_data, 1) in the Crunch pipeline, to make sure that all Avro files from a single day in the staged dataset are combined.

I believe Shard#shard(PCollection, int) will introduce a GBK(reduce), and since you the number of partitions that you specify is 1, all data will flow through a single reducer. This will create a single a single file in filesystem. Removing the Shard may enable the planner to use more reducers. There is a config(-D crunch.bytes.per.reduce.task) that controls the amount of data that gets passed through each reduce task. If you specify a value of 134217728 for the config, you will end up having files that are about 128mb in size. However, this does not help in aggregating data from each day as a separate file. It will be all your data from your dataset that is split as 128mb files irrespective of day.


thought Kite SDK would create 128MB Parquet files in the target dataset when the dataset descriptor property dfs.blocksize is set, but that appears to not be the case as a single, large Parquet file is written.
The single large file is written because the partitions in your shard is 1. If you remove the Shard operation, it becomes a map only job where "mapreduce.input.fileinputformat.split.minsize" or "mapred.max.split.size" controls the amount of data that is read by each map task. That's probably why you see files with sizes about 128mb. 

Not sure if it helps, but that's something I know. 

Matt

unread,
Feb 26, 2015, 1:05:26 PM2/26/15
to cdk...@cloudera.org


On Thursday, February 26, 2015 at 4:57:58 PM UTC+1, Nithin Asokan wrote:

Thank you for your response Nithin.

I believe Shard#shard(PCollection, int) will introduce a GBK(reduce), and since you the number of partitions that you specify is 1, all data will flow through a single reducer. This will create a single a single file in filesystem.

Not entirely as not one single file, but a single file per day partition is created. So the kite dataset target at the end of the Crunch pipeline does take care of the daily partitioning right, but the datasetdescriptor property dfs.blocksize does not lead to 128MB splits as I'd expect and like it to. Perhaps I should have been more specific on day partitioning when I said I followed the dataset staging example.
 
Removing the Shard may enable the planner to use more reducers. There is a config(-D crunch.bytes.per.reduce.task) that controls the amount of data that gets passed through each reduce task. If you specify a value of 134217728 for the config, you will end up having files that are about 128mb in size. However, this does not help in aggregating data from each day as a separate file. It will be all your data from your dataset that is split as 128mb files irrespective of day.

 Setting crunch.bytes.per.reduce.task leaves me with as much Parquet files in the "warehouse" dataset as Avro files in the staged dataset with the shard operation removed, I guess because the Avro files are <<128MB. If I test this setting without shard and with large test Avro files, then it doesn't seem to have a 1:1 effect on the size of the resulting Parquet files.
 
thought Kite SDK would create 128MB Parquet files in the target dataset when the dataset descriptor property dfs.blocksize is set, but that appears to not be the case as a single, large Parquet file is written.
The single large file is written because the partitions in your shard is 1. If you remove the Shard operation, it becomes a map only job where "mapreduce.input.fileinputformat.split.minsize" or "mapred.max.split.size" controls the amount of data that is read by each map task. That's probably why you see files with sizes about 128mb.

As previously explained, I meant a single, large Parquet file per partition is written in the warehouse kite dataset. What I guess I could do is shard and write daily aggregates into an intermediary dataset, and then use this intermediary dataset without shard and in combination with mapreduce.input.fileinputformat.split.minsize to create 128MB Parquet files. Does not feel right, though.
 

Ryan Blue

unread,
Feb 26, 2015, 1:30:26 PM2/26/15
to Matt, cdk...@cloudera.org
Hi Matt,

I know Nithin also responded, but I want to address your initial
questions directly and then see if I can add anything to the other
response. My answers are inline below.

On 02/26/2015 02:45 AM, Matt wrote:
> Hello,
>
> (this is a repost, because my previous post was not very concise and clear)
>
> I'm writing to a partitioned Dataset formatted as Parquet on a local file system and I'd like to know how to make sure that the written Parquet files come as close to, but do not exceed 128MB. To write the dataset, I'm pretty much following the example athttps://github.com/kite-sdk/kite-examples/tree/master/dataset-staging. A notable difference is that I Shard.shard(staged_data, 1) in the Crunch pipeline, to make sure that all Avro files from a single day in the staged dataset are combined. If I don't shard, I end up with a Parquet file for every small-sized, i.e. <<128MB Avro file on a given day and that is not what I want.

Kite provides a utility to help you out with this in CrunchDatasets. I'm
not entirely familiar with Shard, but it looks like it does something
similar and rebalances the data with a reduce phase.

The reason why I recommend using our method, CrunchDatasets.partition is
that this method understands the partition strategy for the dataset and
will shuffle the data so that each partition is handled by a single
reducer. That minimizes the number of files and combines the input as
you want.

The drawback is that this doesn't cut the files off at 128MB...

> I thought Kite SDK would create 128MB Parquet files in the target dataset when the dataset descriptor property dfs.blocksize is set, but that appears to not be the case as a single, large Parquet file is written. Furthermore, the parquet.block.size setting, which I believe is 50 or 100MB by default, does not seem to have any effect either. I am not sure if my assumptions are wrong, or if the shard operation somehow interferes with Kite's workings.

This isn't entirely correct. The DFS block size is an independent
property that determines how the file will be split into HDFS blocks.
The parquet.block.size setting determines how large Parquet will make
row groups, but a file can (and should) have multiple row groups, as
well as multiple HDFS blocks.

You want row groups to fit entirely in the containing HDFS block for
reassembly, so the HDFS block size should be a whole-number multiple of
the Parquet block size.

You can find a full write-up of the Parquet row group option here:

http://ingest.tips/2015/01/31/parquet-row-group-size/

> I would greatly appreciate it if somebody can explain to me how to make sure the Kite SDK dataset is written as desired.

If you want to cut off files at a single row group, then you need to
calculate when to close the current writer (release the files) and start
a new writer. That's difficult right now, but we could add an option to
Kite's writers to have policy-based writes. For example, "close file at
X bytes" or "close files open more than Y seconds".

I'm happy to work with you if you need this option, though I think you
probably want to write the large Parquet files. (The only case I know of
where you don't want to write the large files is if you are reading them
with Impala, but that bug is going to be fixed.)

> p.s.: I have found that setting "mapreduce.input.fileinputformat.split.minsize" allows me to split large Avro files into near 128MB Parquet files iff I remove the shard operation, but this will not work because: (1) as previously mentioned, I am not dealing with large Avro files; and (2) even if I were, this approach will not create as few Parquet files as possible since split remnants are not combined.

This is an interesting approach, but your Parquet and Avro files might
not be the same size. I would expect Parquet files to be smaller than
the Avro files.

rb


--
Ryan Blue
Software Engineer
Cloudera, Inc.

Ryan Blue

unread,
Feb 26, 2015, 1:52:38 PM2/26/15
to Matt, cdk...@cloudera.org
Hi Matt and Nithin,

I don't know that I have much to add here since it is mostly about
configuring Crunch. I'll leave it to you two to find an alternative
there, but do check out the CrunchDatasets.partition methods [1].

rb

[1]:
https://github.com/kite-sdk/kite/blob/master/kite-data/kite-data-crunch/src/main/java/org/kitesdk/data/crunch/CrunchDatasets.java#L191
> --
> You received this message because you are subscribed to the Google
> Groups "CDK Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to cdk-dev+u...@cloudera.org
> <mailto:cdk-dev+u...@cloudera.org>.
> For more options, visit https://groups.google.com/a/cloudera.org/d/optout.

Matt

unread,
Feb 26, 2015, 3:01:22 PM2/26/15
to cdk...@cloudera.org

On Thursday, February 26, 2015 at 7:30:26 PM UTC+1, Ryan Blue wrote:
Hi Matt,  


Kite provides a utility to help you out with this in CrunchDatasets. I'm
not entirely familiar with Shard, but it looks like it does something
similar and rebalances the data with a reduce phase.

The reason why I recommend using our method, CrunchDatasets.partition is
that this method understands the partition strategy for the dataset and
will shuffle the data so that each partition is handled by a single
reducer. That minimizes the number of files and combines the input as
you want.

Based on your explanation I think CrunchDatasets.partition will perform better than Shard.shard, so I'll try it out.
 
This isn't entirely correct. The DFS block size is an independent
property that determines how the file will be split into HDFS blocks.

Got it.
 
I'm happy to work with you if you need this option, though I think you
probably want to write the large Parquet files. (The only case I know of
where you don't want to write the large files is if you are reading them
with Impala, but that bug is going to be fixed.)

The files are indeed meant to be used with Impala, but based on what you said the large files indeed are what I (actually) wanted.
 
This is an interesting approach, but your Parquet and Avro files might
not be the same size. I would expect Parquet files to be smaller than
the Avro files.

You are right, but this approach allowed me to get close.

Thank you very much for your elaborate answer, I can now move on.


Ryan Blue

unread,
Feb 26, 2015, 3:04:04 PM2/26/15
to Matt, cdk...@cloudera.org
On 02/26/2015 12:01 PM, Matt wrote:
> Thank you very much for your elaborate answer, I can now move on.

Glad to hear it. Thanks for using Kite!
Reply all
Reply to author
Forward
0 new messages