Hive DDL and schemas

107 views
Skip to first unread message

Ryan Blue

unread,
Mar 24, 2015, 5:06:21 PM3/24/15
to cdk...@cloudera.org
Allan pointed out we have separate discussions, so I'm moving this to a
new thread.

> Here’s my method, scala I’m afraid. It’s all via Kite.
>
> Personally I’d prefer no table DDL and let HIVE and Impala read the definition from the schema. It seems easier to me and keeps the DDL cleaner but I’m not sure how Impala would handle this .i.e. can it read a schema for a parquet table from the avro.schema.url?
>
>
> def create(database: String, name: String) : Dataset[Record] = {
> if (!repo.exists(database, name)) {
> log.info("Creating dataset at %s with schema %s".format(path, schema.toString(true)))
> repo.create(database, name, new DatasetDescriptor.Builder().format(Formats.PARQUET).schema(schema).build)
> repo.load(database, name).asInstanceOf[Dataset[Record]]
> } else {
> log.info("Dataset at %s already exists.".format(dataset_path))
> repo.load(database, name).asInstanceOf[Dataset[Record]]
> }
> }

I don't know where the DDL would come from then. I don't think Kite sets
the DDL, though I'll have to go check now.

rb

--
Ryan Blue
Software Engineer
Cloudera, Inc.

Andrew Stevenson

unread,
Mar 25, 2015, 5:17:26 AM3/25/15
to Ryan Blue, cdk...@cloudera.org
tableForDescriptor in HiveUtils I think.

// convert the schema to Hive columns
table.getSd().setCols(HiveSchemaConverter.convertSchema(descriptor.getSchema()));
> --
> You received this message because you are subscribed to the Google Groups "CDK Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.
> For more options, visit https://groups.google.com/a/cloudera.org/d/optout.

Ryan Blue

unread,
Mar 25, 2015, 1:16:53 PM3/25/15
to Andrew Stevenson, cdk...@cloudera.org
On 03/25/2015 02:17 AM, Andrew Stevenson wrote:
> tableForDescriptor in HiveUtils I think.
>
> // convert the schema to Hive columns
> table.getSd().setCols(HiveSchemaConverter.convertSchema(descriptor.getSchema()));

You're right. Looks like a bug and that we should be doing the same when
we update the schema. In fact, we should just rewrite the columns
whenever we run an update. Could you open an issue? We'll get this fix
in the next release. Thanks for your patience with me on this one!

Andrew Stevenson

unread,
Mar 25, 2015, 1:39:36 PM3/25/15
to Ryan Blue, cdk...@cloudera.org
Sure. One last question.

How does the csv-import handle missing values? For example

Col1,Col2,Col3
Val1,Val2,

I've triggered this programmatically and will the command succeeds the files can't be read back as no footer is found.


Regards

Andrew

From: Ryan Blue
Sent: ‎25/‎03/‎2015 18:16
To: Andrew Stevenson
Cc: cdk...@cloudera.org
Subject: Re: Hive DDL and schemas

Ryan Blue

unread,
Mar 25, 2015, 1:47:13 PM3/25/15
to Andrew Stevenson, cdk...@cloudera.org
On 03/25/2015 10:39 AM, Andrew Stevenson wrote:
> Sure. One last question.
>
> How does the csv-import handle missing values? For example
>
> Col1,Col2,Col3
> Val1,Val2,
>
> I've triggered this programmatically and will the command succeeds the
> files can't be read back as no footer is found.
>
>
> Regards
>
> Andrew

The Avro field names are used to look up the column values, if there is
a header. Any missing values are assumed to be null. Values that appear
to be present but empty, like Col3 above, have special handling. For
numbers, an empty string is null. For strings, an empty string is an
empty string.

Can you give me an example of what is failing? That sounds like it might
be a bug.

Andrew Stevenson

unread,
Mar 25, 2015, 1:50:22 PM3/25/15
to Ryan Blue, cdk...@cloudera.org
I'll send my code across.

Regards

Andrew

-----Original Message-----
From: "Ryan Blue" <bl...@cloudera.com>
Sent: ‎25/‎03/‎2015 18:47
To: "Andrew Stevenson" <astev...@outlook.com>
Cc: "cdk...@cloudera.org" <cdk...@cloudera.org>
Subject: Re: Hive DDL and schemas

Andrew Stevenson

unread,
Mar 26, 2015, 12:58:44 PM3/26/15
to Ryan Blue, cdk...@cloudera.org
I didn’t have much time today but it looks like there’s a issue reading null values in parquet. This is easily reproducable by running a sqoop with the —as-parquet-file against a table with a nullable column and no data in the column. Sqoop succeeds but checking the files with parquet-tools throws an error. I was able to produce this stack trace via the copy task to move the data to a second dataset.

I also changed Sqoops Avro schema generator to add a null as default but it didn’t help.

INFO jobcontrol.CrunchControlledJob: java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:193)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:148)
at parquet.hadoop.ParquetInputFormat.getFooters(ParquetInputFormat.java:597)
at parquet.hadoop.ParquetInputFormat.getFooters(ParquetInputFormat.java:573)
at parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:412)
at org.kitesdk.data.spi.filesystem.FileSystemViewKeyInputFormat.getSplits(FileSystemViewKeyInputFormat.java:123)
at org.kitesdk.data.mapreduce.DatasetKeyInputFormat.getSplits(DatasetKeyInputFormat.java:254)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:589)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:606)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:490)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1295)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1292)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1292)
at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:329)
at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:204)
at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:238)
at org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:112)
at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:83)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:248)
at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:425)
at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:403)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:299)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:183)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:179)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more

1 job failure(s) occurred:
org.kitesdk.tools.CopyTask: Kite(dataset:hdfs://quickstart.cloudera:8020/data/lz/quic... ID=1 (1/1)(1): java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:193)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(ParquetFileReader.java:148)
at parquet.hadoop.ParquetInputFormat.getFooters(ParquetInputFormat.java:597)
at parquet.hadoop.ParquetInputFormat.getFooters(ParquetInputFormat.java:573)
at parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:412)
at org.kitesdk.data.spi.filesystem.FileSystemViewKeyInputFormat.getSplits(FileSystemViewKeyInputFormat.java:123)
at org.kitesdk.data.mapreduce.DatasetKeyInputFormat.getSplits(DatasetKeyInputFormat.java:254)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:589)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:606)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:490)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1295)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1292)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1292)
at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:329)
at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:204)
at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:238)
at org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:112)
at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:83)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:248)
at parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:425)
at parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:403)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:299)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:183)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:179)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more

Andrew Stevenson

unread,
Mar 27, 2015, 4:17:41 AM3/27/15
to Ryan Blue, cdk...@cloudera.org

Andrew Stevenson

unread,
Apr 1, 2015, 9:21:30 AM4/1/15
to Ryan Blue, cdk...@cloudera.org
Hi Ryan,

To follow up on this. I’m passed the previous error but I’m having problems when columns are deleted on my source system. I have used SQOOP to import a table as Parquet which creates a dataset with a column that no longer exists in my target dataset, my_int1

This is my source dataset produced by SQOOP

>./kite-dataset show dataset:hdfs:/data/lz/localhost/retail_db/categories

{"category_id": 139, "category_department_id": 100, "category_name": "test delete no compact", "my_int2": 102}

>./kite-dataset info dataset:hdfs:/data/lz/localhost/retail_db/categories

Dataset "categories":
URI: "dataset:hdfs://localhost:8020/data/lz/localhost/retail_db/categories"
Schema: {
  "type" : "record",
  "name" : "sqoop_import_categories",
  "doc" : "Sqoop import of categories",
  "fields" : [ {
    "name" : "category_id",
    "type" : [ "null", "int" ],
    "default" : null,
    "columnName" : "category_id",
    "sqlType" : "4"
  }, {
    "name" : "category_department_id",
    "type" : [ "null", "int" ],
    "default" : null,
    "columnName" : "category_department_id",
    "sqlType" : "4"
  }, {
    "name" : "category_name",
    "type" : [ "null", "string" ],
    "default" : null,
    "columnName" : "category_name",
    "sqlType" : "12"
  }, {
    "name" : "my_int2",
    "type" : [ "null", "int" ],
    "default" : null,
    "columnName" : "my_int2",
    "sqlType" : "4"
  } ],
  "tableName" : "categories"
}
Not partitioned

This is the target dataset. It's schema is an update of the source and current schema of the target.

./kite-dataset info categories --namespace retail_db

Dataset "categories":
URI: "dataset:hive://localhost:9083/retail_db/categories"
Schema: {
  "type" : "record",
  "name" : "categories",
  "doc" : "Sqoop import of categories",
  "fields" : [ {
    "name" : "category_id",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "category_department_id",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "category_name",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "my_int2",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "my_int1",
    "type" : [ "null", "int" ],
    "default" : null
  } ]
}
Not partitioned

This is the method I use to copy the source to the target.

/**
 * Load a an existing dataset into a target dataset
 *
 * @param input_dataset The input dataset
 * @param target_dataset The dataset to load
 * */
def load_dataset(input_dataset: Dataset[GenericData.Record],
                 target_dataset : Dataset[GenericData.Record],
                 conf: Configuration) : Int = {
  val source: View[GenericData.Record] = input_dataset
  val dest: View[GenericData.Record] = target_dataset
  conf.set("crunch.log.job.progress", "true")
  conf.set("crunch.debug", "true")

  val task: CopyTask[_] = new CopyTask[GenericData.Record](source, dest)
  task.setConf(conf)
  task.noCompaction
  val result: PipelineResult = task.run

  if (result.succeeded) {
    log.info("Added {} records to \"{}\"", task.getCount, target_dataset.getUri)
    return 0
  }
  else {
    return 1
  }
}

The Crunch job runs successfully but no files are generated in the final target dataset?

I’ve also tried via the cli but this time I get an error.

./kite-dataset copy dataset:hdfs:/data/lz/localhost/retail_db/categories categories --namespace retail_db

1 job failure(s) occurred:
org.kitesdk.tools.CopyTask: Kite(dataset:hdfs://localhost:8020/data/lz/... ID=1 (1/1)(1): Job failed!

Yarn keeps removing the log files on me so I don’t have more info at the moment on this failure.

My understanding was that Kite would handle the column mappings? Effectively the CSVImportCommand calls the CopyTask after creating a temporary dataset and maps on column names or have I missed something?

I want to use Kite to handle the schema evolution. It would be great if this worked as I would no longer have to rewrite files and mess about creating views in HIVE or scripting changes to the HIVE DDLS. 

Thanks

Andrew

Joey Echeverria

unread,
Apr 1, 2015, 11:05:22 AM4/1/15
to Andrew Stevenson, Ryan Blue, cdk...@cloudera.org
This is probably related to this issue:

https://issues.cloudera.org/browse/CDK-973

The short version is that we need to set the reader schema to that of
the target dataset during a copy.
Joey Echeverria
Senior Infrastructure Engineer

Andrew Stevenson

unread,
Apr 1, 2015, 11:20:15 AM4/1/15
to Joey Echeverria, Ryan Blue, cdk...@cloudera.org
I saw that issue and did wonder if it was related. Thanks!

Regards

Andrew

From: Joey Echeverria
Sent: ‎01/‎04/‎2015 17:05
To: Andrew Stevenson
Cc: Ryan Blue; cdk...@cloudera.org

Andrew Stevenson

unread,
Apr 1, 2015, 2:01:43 PM4/1/15
to Ryan Blue, cdk...@cloudera.org
1 job failure(s) occurred:
org.kitesdk.tools.CopyTask: Kite(dataset:hdfs://localhost:8020/data/lz/... ID=1 (1/1)(1): Job failed!

Yarn keeps removing the log files on me so I don’t have more info at the moment on this failure.

My understanding was that Kite would handle the column mappings? Effectively the CSVImportCommand calls the CopyTask after creating a temporary dataset and maps on column names or have I missed something?

I want to use Kite to handle the schema evolution. It would be great if this worked as I would no longer have to rewrite files and mess about creating views in HIVE or scripting changes to the HIVE DDLS. 

Thanks

Andrew
Reply all
Reply to author
Forward
0 new messages