Schema Name and Compatibility

32 views
Skip to first unread message

Allan Shoup

unread,
Mar 23, 2015, 4:54:53 PM3/23/15
to cdk...@cloudera.org
From my testing, it looks like changes to the record name of a schema are considered passive from Kite's perspective but the same change is considered non-passive by Avro. Has anybody else seen the same thing and if so, was this the intended behavior?

Joey Echeverria

unread,
Mar 23, 2015, 5:09:58 PM3/23/15
to Allan Shoup, cdk...@cloudera.org
What was the context? Kite probably won't care about record names when
determining if schemas are compatible because Avro has schema
resolution rules based on field name and type. If you're referencing a
record schema by name in another schema or to create a recursive
schema, that might matter.

Can you talk more about your use case?

-Joey
> --
> You received this message because you are subscribed to the Google Groups
> "CDK Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cdk-dev+u...@cloudera.org.
> For more options, visit https://groups.google.com/a/cloudera.org/d/optout.



--
Joey Echeverria
Senior Infrastructure Engineer

Andrew Stevenson

unread,
Mar 23, 2015, 5:28:34 PM3/23/15
to Joey Echeverria, Allan Shoup, cdk...@cloudera.org
Basically I’m Sqooping data into HDFS. I can’t import as parquet, which creates a dataset behind the scenes, as I need to use the direct import, especially for Netezza. Since the tables on the RDBMS can change I want to use Avro to handle the schema evolution and Kite to manage the datasets and conversion to parquet. I need to expose the final datasets via HIVE/Impala so I need either the tblproperties of HIVE pointing to the Kite schema and the actual columns to be in sync.

My process is:

1. Sqoop as text into HDFS
2. Use Sqoops Avro generator to get the Avro schema of the target table on the RDBMS
3. Create a dataset if not exists else merge and update the schema
4. Use CSVImport to load the Sqoop’d text files into the repo created in step 3. 

Step 3 updates the HIVE columns and tblproperties correctly if I add table.getSd().setCols(HiveSchemaConverter.convertSchema(descriptor.getSchema())); to the updateDescriptor method.

Joey Echeverria

unread,
Mar 23, 2015, 6:00:52 PM3/23/15
to Allan Shoup, cdk...@cloudera.org
This is expected. Kite does schema evolution by field compatibility,
not record equality. The full details are in our schema evolution
guide[1]. I'd also expect the same to work if you wrote data using
MyClass.getClassSchema() and then read it with Avro using
MyDifferentClass.getClassSchema(), Avro would handle it the same way.

-Joey

On Mon, Mar 23, 2015 at 2:47 PM, Allan Shoup <allan...@gmail.com> wrote:
> Joey, here's what my test looked like:
>
> DatasetDescriptor descriptor = new
> DatasetDescriptor.Builder().schema(MyClass.getClassSchema()).build();
> Dataset<MyClass> dataset1 = Datasets.create(uri, descriptor,
> MyClass.class);
>
> DatasetDescriptor existingDescriptor = Datasets.load(uri,
> MyDifferentClass.class).getDataset().getDescriptor();
> descriptor = new
> DatasetDescriptor.Builder(existingDescriptor).schema(MyDifferentClass.getClassSchema()).build();
> // I expected this to fail because Avro considers MyClass ->
> MyDifferentClass an incompatible change
> Dataset<MyDifferentClass> dataset2 = Datasets.update(uri,
> descriptor, MyDifferentClass.class);

Allan Shoup

unread,
Mar 23, 2015, 6:20:01 PM3/23/15
to Joey Echeverria, cdk...@cloudera.org
Good to know. I see an implementation note in org.kitesdk.data.spi.SchemaValidationUtil to replace that with AVRO-1315 (presumably org.apache.avro.SchemaCompatibility) when generally available. I believe SchemaCompatibility will flag this as an incompatible change, however.

Ryan Blue

unread,
Mar 23, 2015, 7:19:21 PM3/23/15
to Allan Shoup, Joey Echeverria, cdk...@cloudera.org
On 03/23/2015 03:20 PM, Allan Shoup wrote:
> Good to know. I see an implementation note
> in org.kitesdk.data.spi.SchemaValidationUtil to replace that
> with AVRO-1315 (presumably org.apache.avro.SchemaCompatibility) when
> generally available. I believe SchemaCompatibility will flag this as an
> incompatible change, however.

I think that the compatibility determination is identical between the
one in Avro and the one in Kite. In fact, the one in Kite was used to
fix a recursive bug in the one in Avro.

rb


--
Ryan Blue
Software Engineer
Cloudera, Inc.

Andrew Stevenson

unread,
Mar 24, 2015, 3:09:38 AM3/24/15
to Ryan Blue, Allan Shoup, Joey Echeverria, cdk...@cloudera.org
I'm note sure this solves my issue. The dataset schema is in sync. The avro schema evolution handles the addition and deletion of columns but if you do a "describe table" in HIVE there's no change in the columns. This is what the end user sees. I'll send an example later.

Regards

Andrew

From: Ryan Blue
Sent: ‎24/‎03/‎2015 00:19
To: Allan Shoup; Joey Echeverria
Cc: cdk...@cloudera.org
Subject: Re: Schema Name and Compatibility

Andrew Stevenson

unread,
Mar 24, 2015, 9:38:05 AM3/24/15
to Ryan Blue, Allan Shoup, Joey Echeverria, cdk...@cloudera.org
Here’s an example. 

It shows the columns are not in sync with the schema pointed to in the avro.schema.url. They are missing the new column “my_int_col1”. Since most end users of my current client use HIVE they only see the col_name.

Adding this call to updateTableSchema in HiveUtils fixes the issue for me.

table.getSd().setCols(HiveSchemaConverter.convertSchema(descriptor.getSchema()));


>hive --database retail_db -e "describe formatted categories"

Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-0.13.1-cdh5.3.0.jar!/hive-log4j.properties
OK
Time taken: 0.508 seconds
OK
# col_name             data_type           comment             
 
category_id         int                                    
category_department_id int                                    
category_name       string                                  
my_test_varchar     string                                  
my_test_varchar2     string                         
 
# Detailed Table Information  
Database:           retail_db            
Owner:               null                  
CreateTime:         Tue Mar 24 05:01:07 PDT 2015  
LastAccessTime:     UNKNOWN              
Protect Mode:       None                  
Retention:           0                    
Table Type:         EXTERNAL_TABLE        
Table Parameters:  
COLUMN_STATS_ACCURATE false               
EXTERNAL             TRUE                
kite.compression.type snappy              
numFiles             0                   
numRows             -1                  
rawDataSize         -1                  
totalSize           0                   
transient_lastDdlTime 1427198467          
 
# Storage Information  
SerDe Library:       org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe  
InputFormat:         org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat  
OutputFormat:       org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat  
Compressed:         No                    
Num Buckets:         -1                    
Bucket Columns:     []                    
Sort Columns:       []                    
Time taken: 0.39 seconds, Fetched: 37 row(s)

{
  "type" : "record",
  "name" : "categories",
  "doc" : "Sqoop import of categories",
  "fields" : [ {
    "name" : "category_id",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "category_department_id",
    "type" : [ "null", "int" ],
    "default" : null
  }, {
    "name" : "category_name",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "my_test_varchar",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "my_test_varchar2",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "my_int_col1",
    "type" : [ "null", "int" ],
    "default" : null
  } ]
}[cloudera@quickstart sqoop-service-0.1]

Allan Shoup

unread,
Mar 24, 2015, 12:18:29 PM3/24/15
to Ryan Blue, Joey Echeverria, cdk...@cloudera.org
Ryan, I'm not sure I follow. Given these two schemas
         myClassSchema: {"type":"record","name":"MyClass",         "namespace":"com.example","fields":[{"name":"i","type":"int"}]}
myDifferentClassSchema: {"type":"record","name":"MyDifferentClass","namespace":"com.example","fields":[{"name":"i","type":"int"}]}

Running 
SchemaCompatibility.checkReaderWriterCompatibility(myDifferentClassSchema, myClassSchema);

with avro 1.7.7 returns the type as 
SchemaCompatibilityType.INCOMPATIBLE

Ryan Blue

unread,
Mar 24, 2015, 1:25:36 PM3/24/15
to Andrew Stevenson, Allan Shoup, Joey Echeverria, cdk...@cloudera.org
How was this table created? It looks like it was done with STORED AS
AVRO, and then later updated by Kite, which added the schema pointer.
Maybe Kite should be removing the table DDL in that case so that Hive
always uses the one at avro.schema.url. That should override since it
has more information so we might need to file a Hive bug, too.

rb
>> <mailto:astev...@outlook.com>> wrote:
>>
>> I'm note sure this solves my issue. The dataset schema is in sync. The
>> avro schema evolution handles the addition and deletion of columns but
>> if you do a "describe table" in HIVE there's no change in the columns.
>> This is what the end user sees. I'll send an example later.
>>
>> Regards
>>
>> Andrew
>> ------------------------------------------------------------------------
>> From: Ryan Blue <mailto:bl...@cloudera.com>
>> Sent: ‎24/‎03/‎2015 00:19
>> To: Allan Shoup <mailto:allan...@gmail.com>; Joey Echeverria
>> <mailto:jo...@scalingdata.com>
>> Cc: cdk...@cloudera.org <mailto:cdk...@cloudera.org>
>> Subject: Re: Schema Name and Compatibility
>>
>> On 03/23/2015 03:20 PM, Allan Shoup wrote:
>> > Good to know. I see an implementation note
>> > in org.kitesdk.data.spi.SchemaValidationUtil to replace that
>> > with AVRO-1315 (presumably org.apache.avro.SchemaCompatibility) when
>> > generally available. I believe SchemaCompatibility will flag this as an
>> > incompatible change, however.
>>
>> I think that the compatibility determination is identical between the
>> one in Avro and the one in Kite. In fact, the one in Kite was used to
>> fix a recursive bug in the one in Avro.
>>
>> rb
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Cloudera, Inc.
>>
>> --
>> You received this message because you are subscribed to the Google
>> Groups "CDK Development" group.
>> To unsubscribe from this group and stop receiving emails from it, send
>> an email to cdk-dev+u...@cloudera.org
>> <mailto:cdk-dev+u...@cloudera.org>.
>> For more options, visit https://groups.google.com/a/cloudera.org/d/optout.
>>
>> --
>> You received this message because you are subscribed to the Google
>> Groups "CDK Development" group.
>> To unsubscribe from this group and stop receiving emails from it, send
>> an email to cdk-dev+u...@cloudera.org
>> <mailto:cdk-dev+u...@cloudera.org>.
>> For more options, visit https://groups.google.com/a/cloudera.org/d/optout.
>


--

Andrew Stevenson

unread,
Mar 24, 2015, 1:34:33 PM3/24/15
to Ryan Blue, Allan Shoup, Joey Echeverria, cdk...@cloudera.org
Here’s my method, scala I’m afraid. It’s all via Kite.

Personally I’d prefer no table DDL and let HIVE and Impala read the definition from the schema. It seems easier to me and keeps the DDL cleaner but I’m not sure how Impala would handle this .i.e. can it read a schema for a parquet table from the avro.schema.url?


def create(database: String, name: String) : Dataset[Record] = {
if (!repo.exists(database, name)) {
log.info("Creating dataset at %s with schema %s".format(path, schema.toString(true)))
repo.create(database, name, new DatasetDescriptor.Builder().format(Formats.PARQUET).schema(schema).build)
repo.load(database, name).asInstanceOf[Dataset[Record]]
} else {
log.info("Dataset at %s already exists.".format(dataset_path))
repo.load(database, name).asInstanceOf[Dataset[Record]]
> To unsubscribe from this group and stop receiving emails from it, send an email to cdk-dev+u...@cloudera.org.

Ryan Blue

unread,
Mar 24, 2015, 2:08:53 PM3/24/15
to Allan Shoup, Joey Echeverria, cdk...@cloudera.org
On 03/24/2015 09:18 AM, Allan Shoup wrote:
> Ryan, I'm not sure I follow. Given these two schemas
> myClassSchema: {"type":"record","name":"MyClass",
> "namespace":"com.example","fields":[{"name":"i","type":"int"}]}
> myDifferentClassSchema:
> {"type":"record","name":"MyDifferentClass","namespace":"com.example","fields":[{"name":"i","type":"int"}]}
>
> Running
> SchemaCompatibility.checkReaderWriterCompatibility(myDifferentClassSchema,
> myClassSchema);
>
> with avro 1.7.7 returns the type as
> SchemaCompatibilityType.INCOMPATIBLE

You're right. I just took a look at the Avro version and it's not what I
remembered at all. Kite checks whether it can be read, which assumes
that the names match, while Avro appears to implement the spec's compat
rules explicitly. I think we need to be a little more forgiving there
since there are situations, like replacing a Schema generated from DDL
with a real one, that require this.

So to answer your original question, I think they are different and for
a good reason. The Avro one is much more strict, but we can take a look
at changing that if you think there's a good argument against doing it
that way.

Allan Shoup

unread,
Mar 24, 2015, 2:47:11 PM3/24/15
to Ryan Blue, Joey Echeverria, cdk...@cloudera.org
The main reason I brought this up is that I found it surprising - especially given how Kite's schema evolution documentation is worded today. If the documentation can be made more explicit, then that would definitely improve the situation.

The only reason I might argue that Kite should fail in this situation is that it would be a good safety check to prevent accidentally updating the wrong dataset (for example if you had URIs switched around).

Reply all
Reply to author
Forward
0 new messages