Using confluent 5.3.1
Steps:
# 0. Start with no topics
confluent local destroy
confluent local start
# 1. Create a topic
kafka-topics --create --zookeeper localhost:2181 --topic sample --partitions 1 --replication-factor 1 --config
retention.ms=3600000
# 2. Set the schema for the 'sample' topic via control-center:
{
"type": "record",
"name": "sample",
"namespace": "test",
"fields": [
{"name": "countryCode", "type": "string"},
{"name": "messagesDeliveryRate", "type": "double"}
]
}
# 3. Confirm the 'sample-value' schema exists:
curl localhost:8081/subjects/sample-value/versions/latest
{"subject":"sample-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"sample\",\"namespace\":\"test\",\"fields\":[{\"name\":\"countryCode\",\"type\":\"string\"},{\"name\":\"messagesDeliveryRate\",\"type\":\"double\"}]}"}
# 4. Load data from the database with kafka connect (sample.properties):
name=sample
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.enabled=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.enabled=true
connection.url=jdbc:mysql://dbserver/data?serverTimezone=UTC
query=select countryCode, cast(messagesDeliveryRate as double) messagesDeliveryRate from message_summaries where acceptedDate >= '2019-09-01 00' and acceptedDate < '2019-09-01 01'
validate.not.null=false
topic.prefix=sample
connection.user=USERNAME
connection.password=PASSWORD
auto.create.topics.enable=false
value.converter.auto.register.schemas=false
# 5. Load the kafka connector: 'confluent local load sample -- -d sample.properties'
# 6. Error in connect/connect.stdout:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic sample :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: {"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"countryCode","type":["null","string"],"default":null},{"name":"messagesDeliveryRate","type":["null","double"],"default":null}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Any ideas?
Thanks,
-Ande