Partitioning generating tiny parquet files

48 views
Skip to first unread message

Chas Coppard

unread,
Mar 6, 2015, 10:01:11 AM3/6/15
to cdk...@cloudera.org
Hi,

I have a situation where I'm importing a file which results in the creation of around 350 partitions.

I'm using kitedsk to write partitioned parquet files.

When we run the import each partition ends up with multiple parquet files (10 to 20) each only about 1.6K in size.

Is this expected behaviour? Is it maybe because the sdk can only manage a certain number of write buffers and thus keeps closing the files?

Any ideas welcome.

Thanks
--

Chas Coppard
Strategic Architect
Experian GSD

Experian | 7 Old Town | Clapham | London | SW4 0JT

M: +44 (0) 759 500 3255 | E: chas.c...@experian.com 
W: www.experian.com |  Skype chascoppard


Information in this e-mail and any attachments is confidential, and may not be copied or used by anyone other than the addressee, nor disclosed to any third party without our permission. 
 There is no intention to create any legally binding contract or other binding commitment through the use of this electronic communication.  Although Experian has taken reasonable steps to ensure that this communication and any attachments are free from computer virus, you are advised to take your own steps to ensure that they are actually virus free.  Experian Limited (Registered No. 653331), with registered office at Landmark House, Experian Way, NG2 Business Park, Nottingham, NG80 1ZZ, United Kingdom.

Joey Echeverria

unread,
Mar 6, 2015, 10:18:22 AM3/6/15
to Chas Coppard, cdk...@cloudera.org
Yes, the dataset writer keeps a cache of writers, one per partition with a default limit of 10. You can change the limit by setting the kite.writer.cache-size property on the dataset. This is done most easily with the Kite CLI update command[1]. 

If you can batch load your data, then you have more options. For example, when you copy from one dataset to another or import from HDFS using the CLI, then the MapReduce job that performs the copy will use the MapReduce shuffle to pre-partition the data so that each reduce task writes to a single dataset at once. 

You can get the same benefit from a custom bulk loading job that uses CrunchDatasets.partition() before writing your results. 

-Joey

--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
For more options, visit https://groups.google.com/a/cloudera.org/d/optout.

Ryan Blue

unread,
Mar 6, 2015, 1:26:21 PM3/6/15
to Joey Echeverria, Chas Coppard, cdk...@cloudera.org
On 03/06/2015 07:18 AM, Joey Echeverria wrote:
> Yes, the dataset writer keeps a cache of writers, one per partition with
> a default limit of 10. You can change the limit by setting the
> kite.writer.cache-size property on the dataset. This is done most easily
> with the Kite CLI update command[1].

There's an example and more discussion about the cache size here:
http://kitesdk.org/docs/1.0.0/using-kite-properties.html

> If you can batch load your data, then you have more options. For
> example, when you copy from one dataset to another or import from HDFS
> using the CLI, then the MapReduce job that performs the copy will use
> the MapReduce shuffle to pre-partition the data so that each reduce task
> writes to a single dataset at once.
>
> You can get the same benefit from a custom bulk loading job that uses
> CrunchDatasets.partition() before writing your results.

Docs are here:

http://kitesdk.org/docs/1.0.0/apidocs/org/kitesdk/data/crunch/CrunchDatasets.html

> -Joey
>
> [1] http://kitesdk.org/docs/1.0.0/cli-reference.html#update
>
> On Mar 6, 2015, at 08:01, Chas Coppard <ch...@alchemysocial.com
> <mailto:ch...@alchemysocial.com>> wrote:
>
>> Hi,
>>
>> I have a situation where I'm importing a file which results in the
>> creation of around 350 partitions.
>>
>> I'm using kitedsk to write partitioned parquet files.
>>
>> When we run the import each partition ends up with multiple parquet
>> files (10 to 20) each only about 1.6K in size.
>>
>> Is this expected behaviour? Is it maybe because the sdk can only
>> manage a certain number of write buffers and thus keeps closing the files?
>>
>> Any ideas welcome.
>>
>> Thanks
>> --
>>
>> *Chas Coppard
>> **Strategic Architect
>> **Experian GSD
>> **
>> *Experian | 7 Old Town | Clapham | London | SW4 0JT
>> M: +44 (0) 759 500 3255 | E: _chas.c...@experian.com
>> <mailto:chas.c...@experian.com>_|
>> W: _www.experian.com <http://www.experian.com>_ | Skype chascoppard
>>
>>
>> Information in this e-mail and any attachments is confidential, and
>> may not be copied or used by anyone other than the addressee, nor
>> disclosed to any third party without our permission. There is no
>> intention to create any legally binding contract or other binding
>> commitment through the use of this electronic communication. Although
>> Experian has taken reasonable steps to ensure that this communication
>> and any attachments are free from computer virus, you are advised to
>> take your own steps to ensure that they are actually virus free.
>> Experian Limited (Registered No. 653331), with registered office at
>> Landmark House, Experian Way, NG2 Business Park, Nottingham, NG80 1ZZ,
>> United Kingdom.
>>
>> --
>> You received this message because you are subscribed to the Google
>> Groups "CDK Development" group.
>> To unsubscribe from this group and stop receiving emails from it, send
>> an email to cdk-dev+u...@cloudera.org
>> <mailto:cdk-dev+u...@cloudera.org>.
> --
> You received this message because you are subscribed to the Google
> Groups "CDK Development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to cdk-dev+u...@cloudera.org
> <mailto:cdk-dev+u...@cloudera.org>.
--
Ryan Blue
Software Engineer
Cloudera, Inc.

Chas Coppard

unread,
Mar 11, 2015, 8:20:43 AM3/11/15
to Ryan Blue, Joey Echeverria, cdk...@cloudera.org
Thanks guys, I've switched to doing the partitioning on the cluster using CrunchDatasets.partition() and the results are good. I thought I'd share my code for others and also to get any feedback to see if I'm doing it right. There are a couple of questions in the code below:

public class TestPartitioning extends CrunchTool implements Serializable
{
    public static void main(String... args) throws Exception
    {
        int rc = ToolRunner.run(new TestPartitioning(), args);
        System.exit(rc);
    }
      
    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/etc/integrator/core-site.xml"));
        conf.addResource(new Path("/etc/integrator/hdfs-site.xml"));

        DefaultConfiguration.set(conf);

        Pipeline pipeline = new MRPipeline(TestPipeline.class, conf);

        pipeline.enableDebug();
        pipeline.getConfiguration().set("crunch.log.job.progress", "true");
        
        Dataset<Record> from = Datasets.load(
                "dataset:hdfs:/user/chascoppard/unpartitioned_data",
                Record.class);
   
        PartitionStrategy ps = ew PartitionStrategy.Builder()
        .year("timestamp")
        .month("timestamp")
        .day("timestamp")
        .hour("timestamp")
        .build();

        String hdfsPath = "/user/chascoppard/partitioned_data";

        DatasetDescriptor dsd = new DatasetDescriptor.Builder()
        .schema(from.getDescriptor().getSchema())
        .format(Formats.PARQUET)
        .partitionStrategy(ps)
        .location(
            FileSystem.getDefaultUri(
            DefaultConfiguration.get()).toString() +
            hdfsPath)
        .build();

I had to include the location of the file here, otherwise when I call Datasets.update() later I get the error:
org.kitesdk.data.ValidationException: Dataset location is not compatible with existing: hdfs://ubuntu1:8020/user/chascoppard/partitioned_data != null
Is this the correct way of dealing with this issue?


        Dataset<Record> to = null;
        String datasetPath = "dataset:hdfs:" + hdfsPath;
        if (!Datasets.exists(datasetPath))
        {
            to = Datasets.create(datasetPath, dsd, Record.class);
        }
        else
        {
            to = Datasets.update(datasetPath, dsd, Record.class);
        }
        
        PCollection<Record> recs = pipeline.read(CrunchDatasets.asSource(from));
        pipeline.run();

If I don't call pipeline.run() here then the pipeline.write() below will write an empty dataset. Is this the correct way of doing this? I thought I would only need to call run once at the end.

        recs = CrunchDatasets.partition(recs, to);

        pipeline.write(recs, CrunchDatasets.asTarget(to), WriteMode.APPEND);

        return pipeline.run().succeeded() ? 0 : 1;
    }
}


W: _www.experian.com <http://www.experian.com>_ |  Skype chascoppard



Information in this e-mail and any attachments is confidential, and
may not be copied or used by anyone other than the addressee, nor
disclosed to any third party without our permission. There is no
intention to create any legally binding contract or other binding
commitment through the use of this electronic communication. Although
Experian has taken reasonable steps to ensure that this communication
and any attachments are free from computer virus, you are advised to
take your own steps to ensure that they are actually virus free.
Experian Limited (Registered No. 653331), with registered office at
Landmark House, Experian Way, NG2 Business Park, Nottingham, NG80 1ZZ,
United Kingdom.

--
You received this message because you are subscribed to the Google
Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send

--
You received this message because you are subscribed to the Google
Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send


--
Ryan Blue
Software Engineer
Cloudera, Inc.



--

Chas Coppard
Strategic Architect
Experian GSD


Experian | 7 Old Town | Clapham | London | SW4 0JT

M: +44 (0) 759 500 3255 | E: chas.c...@experian.com 
W: www.experian.com |  Skype chascoppard

Ryan Blue

unread,
Mar 11, 2015, 12:48:42 PM3/11/15
to Chas Coppard, Joey Echeverria, cdk...@cloudera.org
On 03/11/2015 05:20 AM, Chas Coppard wrote:
> I had to include the location of the file here, otherwise when I
> call /Datasets.update() /later I get the error:_
> _org.kitesdk.data.ValidationException: Dataset location is not
> compatible with existing:
> hdfs://ubuntu1:8020/user/chascoppard/partitioned_data != null
> Is this the correct way of dealing with this issue?

When creating the descriptor for update, did you use the copy
constructor for the builder? Otherwise you aren't carrying the data
location forward, which isn't allowed.

> If I don't call /pipeline.run() /here then the /pipeline.write()
> /below will write an empty dataset. Is this the correct way of doing
> this? I thought I would only need to call run once at the end.

In our version, we set up the pipeline, call write, and then call
`pipeline.done`.

According to the docs, `write` sets up the output for the next call to
`run`, so you definitely have to call run after write. `done` will run
anything needed to produce outputs, and `run` "constructs and executes
... in order to write data to the output targets". I'm not sure exactly
why Crunch doesn't write if you haven't already called run, but using
`done` instead should fix your problem.

I'm glad you have something working, and thanks for sharing what you did!

rb

Buntu Dev

unread,
Mar 11, 2015, 1:24:11 PM3/11/15
to Ryan Blue, Chas Coppard, Joey Echeverria, cdk...@cloudera.org
Thanks for sharing the information.

I have similar issue with small Avro files generated from the real-time event stream. I got Flume configured to write to Kite dataset sink.

- If its a real-time stream and the events are expected to come in an order, does it still help to increase the cache-set?
- Is the cache-size set per dataset? I've Flume multiplex and write to 3 datasets, does it mean cache-set of 10 is across all the datasets or its 10 per dataset? Based on the example Ryan pointed out, it seems to be set per dataset but just want to confirm.
- Since once in a while we tend to reprocess the events for past 30days and our partition is year/month/day. Is there any issue with bumping up the cache-set to say 30 or more? 


Thanks!


--
You received this message because you are subscribed to the Google Groups "CDK Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+unsubscribe@cloudera.org.

Ryan Blue

unread,
Mar 11, 2015, 1:33:15 PM3/11/15
to Buntu Dev, Chas Coppard, Joey Echeverria, cdk...@cloudera.org
On 03/11/2015 10:24 AM, Buntu Dev wrote:
> Thanks for sharing the information.
>
> I have similar issue with small Avro files generated from the real-time
> event stream. I got Flume configured to write to Kite dataset sink.
>
> - If its a real-time stream and the events are expected to come in an
> order, does it still help to increase the cache-set?

Not really. I would expect you to have one or two open files at a time,
depending on when records come in. If anything, I would decrease the
writer cache size so that only the current partition and the last
partition are open at once. There's probably no need to keep any more open.

The best way to increase file size for data coming from Flume is to
increase the "roll" interval that the sink waits before closing its
writers to release data. That defaults to 30 seconds, but if you are
seeing small files then longer would probably help.

This is an area where Kite could definitely improve. Right now, the
DatasetSink will keep a writer open for the given timeout and then close
it, including all of the writers for individual partitions. But those
partition writers may have just been opened, so support for time-based
file rolling inside the Kite partitioned writer would be a big improvement.

> - Is the cache-size set per dataset? I've Flume multiplex and write to 3
> datasets, does it mean cache-set of 10 is across all the datasets or its
> 10 per dataset? Based on the example Ryan pointed out, it seems to be
> set per dataset but just want to confirm.

This is actually per partitioned writer (that is a writer for multiple
partitions).

> - Since once in a while we tend to reprocess the events for past 30days
> and our partition is year/month/day. Is there any issue with bumping up
> the cache-set to say 30 or more?

I don't see this being a problem for Avro. Parquet datasets require a
lot more memory, so I recommend reprocessing with a MR job rather than
through Flume. In that case, Kite's CopyTask (backing the copy command)
will automatically shuffle the data to minimize the number of open files
so you don't have to worry about the limit.

Buntu Dev

unread,
Mar 11, 2015, 1:38:18 PM3/11/15
to Ryan Blue, Chas Coppard, Joey Echeverria, cdk...@cloudera.org
Thanks Ryan, I will play around with the roll interval to find the acceptable interval.
Reply all
Reply to author
Forward
0 new messages