Schema evolution support (parquet)

268 views
Skip to first unread message

Pavan Gavvala

unread,
Jul 10, 2017, 8:30:57 AM7/10/17
to Confluent Platform

I have built a data pipe line using

1. Kafka jdbc source connector

2. Kafka hdfs sink connector with Hive integration (I am defining property format.class=io.confluent.connect.hdfs.parquet.ParquetFormat for output in hdfs in parquet format)

When I try to add column in mysql at source, and add data, it is not updating the schema in hive. Does defining parquet as output format in for HDFS sink connector do no support schema evolution ?

Seeing below error in kafka after updating the schema in mysql and inserting the records.

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"user_data","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"role","type":"string"},{"name":"modified_date","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"skill","type":"string"}],"connect.name":"user_data"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
        at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:103)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:197)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)



Konstantine Karantasis

unread,
Jul 10, 2017, 4:21:13 PM7/10/17
to confluent...@googlegroups.com

Hi Pavan, 

have you set the property 'schema.compatibility' to match your requirements?

You can find the valid values here: 

and an explanation of the modes here: 


Also, since you mention Parquet formatter specifically, have you tried exporting records with the Avro formatter and it works? That's something you might want to do if you suspect an issue with Parquet.
However, Based on the log message above, the issue seems to be with the compatibility mode, which by default is set to NONE. If that's the case, Hive integration won't work. 

Konstantine



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/8be351cf-aa84-4f1b-b0bd-a028f022b138%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Pavan Gavvala

unread,
Jul 10, 2017, 11:17:22 PM7/10/17
to Confluent Platform

This is my properties file for hdfs sink connector.

name=hdfs-sink-3
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test7_mysql_jdbc_avro_user_data
hdfs.url=hdfs://quickstart.cloudera:8020
flush.size=3

#Specify format and partitioner class
#format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
format.class=io.confluent.connect.hdfs.avro.AvroFormat

#Hive integration
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD


Without changing the schema(adding new column), the data is going from rdbms to hive successfully but when I add a new column at mysql, I am getting the same error with both avro and parquet. I have tested it many times, I am not sure what is going wrong. I get the same error when I add a column at source(mysql) and insert data.

DDL at source:

create table user(

id INT NOT NULL AUTO_INCREMENT,

name VARCHAR(40) NOT NULL,

role VARCHAR(40) NOT NULL,

modified_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (id)

);


Adding a new column:


alter table user_data add column skill varchar(20) not null default "java";



Adding data:


INSERT INTO USER_DATA (name,role,skill) values("user3","support","hadoop");



Error in Kafka:

[2017-07-10 20:17:21,409] ERROR Task test-mysql-jdbc-autoincrement-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)

org.apache.kafka.connect.errors.DataException: test7_mysql_jdbc_avro_user_data

        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)

[2017-07-10 20:17:21,417] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)



To post to this group, send email to confluent...@googlegroups.com.

Pavan Gavvala

unread,
Jul 11, 2017, 4:54:04 AM7/11/17
to Confluent Platform
Schema evolution is working if I don't use not null constraint for the column.

Not working:

alter table user_data add column skill varchar(20) not null default "java";


working
:
alter table user_data add column skill varchar(20) default "java";



Reply all
Reply to author
Forward
0 new messages