Connect JDBC Connector Schema Evolution possible?

396 views
Skip to first unread message

Jim Malone

unread,
May 20, 2016, 4:51:32 PM5/20/16
to Confluent Platform
Hi,

I am using the JDBC connector for Kafka connect and feeding it a query from SQL server using timestamp+incrementing mode.  Most of the columns returned are nullable and in the future I will need to add columns being returned by the query so it needs to handle schema evolution.  The schema builder within the connector already correctly inserts a union of "null" and the actual field type for each field in the query, however it does NOT set a default value.  I would assume that it just needs to set a default value of NULL in order for the backwards compatibility to work if a new field gets added.  

Has anyone tackled this problem before?  Is there an easy update I can make to the connector to handle this, or a workaround?  I surprisingly haven't seen any other posts discussing this issue, which I would think is somewhat common.

Thanks
Jim

Ewen Cheslack-Postava

unread,
May 24, 2016, 8:35:28 PM5/24/16
to Confluent Platform
It won't set the null value as a default. I think people would probably find that pretty confusing -- I think most people expect the column default value to be used. However, there are challenges to making that work since column default values can do more than default values in schemas usually can. For example CURRENT_TIMESTAMP can be used for timestamp columns, but schemas generally only support constant values as defaults.

Because of this, schema compatibility is currently a bit tricky. You can still do something like disabling compatibility checks in the schema registry, but then you still need downstream applications to be able to handle the different formats of data. However, I'd also think about the types of schema changes you're looking to make -- even with the current setup, you may be able to get compatible schemas as long as you restrict the types of schema changes you make.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/6ce2600a-dd7e-4ba1-9f75-4550b5ae3362%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Jim Malone

unread,
May 25, 2016, 10:26:11 AM5/25/16
to Confluent Platform
I appreciate the response.  I guess I'm confused though.  Is there no way to make Kafka Connect handle a newly added column to a table, even if all columns have defaults?  It doesn't look like Kafka Connect adds the "default" field to any schema regardless of what your actual database schema looks like.  I tested adding a new column (with a default) to a table (that already had defaults for all columns) and the Kafka Connect logs gave me the following error:

org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data:
 at io
.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:92)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:142)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.access$600(WorkerSourceTask.java:50)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:356)
 at org
.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"jim_connect_test","fields":[{"name":"id","type":"long"},{"name":"ts","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp"}},{"name":"name","type":["null","string"]},{"name":"age","type":["null","int"]},{"name":"country","type":["null","string"]},{"name":"active","type":{"type":"int","connect.type":"int8"}},{"name":"married","type":{"type":"int","connect.type":"int8"}}],"connect.name":"jim_connect_test"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409; error code: 409


The only schema change scenario we care about right now is being able to handle new columns getting added without having to send the data to a new kafka topic (by creating a new connector).  We'd prefer not to disable compatibility checks in the schema registry if possible as we use it for more than just Kafka Connect.  Is this possible?

Also, can you explain a little more your last sentence?  How can we get compatible schemas by restricting the types of schema changes?

-Jim

On Tuesday, May 24, 2016 at 8:35:28 PM UTC-4, Ewen Cheslack-Postava wrote:
It won't set the null value as a default. I think people would probably find that pretty confusing -- I think most people expect the column default value to be used. However, there are challenges to making that work since column default values can do more than default values in schemas usually can. For example CURRENT_TIMESTAMP can be used for timestamp columns, but schemas generally only support constant values as defaults.

Because of this, schema compatibility is currently a bit tricky. You can still do something like disabling compatibility checks in the schema registry, but then you still need downstream applications to be able to handle the different formats of data. However, I'd also think about the types of schema changes you're looking to make -- even with the current setup, you may be able to get compatible schemas as long as you restrict the types of schema changes you make.

-Ewen
On Fri, May 20, 2016 at 1:51 PM, Jim Malone <malo...@gmail.com> wrote:
Hi,

I am using the JDBC connector for Kafka connect and feeding it a query from SQL server using timestamp+incrementing mode.  Most of the columns returned are nullable and in the future I will need to add columns being returned by the query so it needs to handle schema evolution.  The schema builder within the connector already correctly inserts a union of "null" and the actual field type for each field in the query, however it does NOT set a default value.  I would assume that it just needs to set a default value of NULL in order for the backwards compatibility to work if a new field gets added.  

Has anyone tackled this problem before?  Is there an easy update I can make to the connector to handle this, or a workaround?  I surprisingly haven't seen any other posts discussing this issue, which I would think is somewhat common.

Thanks
Jim

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

leon johnson

unread,
Oct 27, 2017, 9:44:36 AM10/27/17
to Confluent Platform
I'm having the exact same issue did this ever get addressed.
Reply all
Reply to author
Forward
0 new messages