Kafka connect can't load data into a topic with a schema? (Schema not found; error code: 40403)

1,029 views
Skip to first unread message

Ande Kastanis

unread,
Oct 31, 2019, 6:13:36 PM10/31/19
to Confluent Platform
Using confluent 5.3.1

Steps:
# 0. Start with no topics
confluent local destroy
confluent local start

# 1. Create a topic
kafka-topics --create --zookeeper localhost:2181 --topic sample --partitions 1 --replication-factor 1 --config retention.ms=3600000

# 2. Set the schema for the 'sample' topic via control-center:
{
 "type": "record",
 "name": "sample",
 "namespace": "test",
 "fields": [
{"name": "countryCode", "type": "string"},
{"name": "messagesDeliveryRate", "type": "double"}
 ]
}

# 3. Confirm the 'sample-value' schema exists: 
curl localhost:8081/subjects/sample-value/versions/latest
{"subject":"sample-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"sample\",\"namespace\":\"test\",\"fields\":[{\"name\":\"countryCode\",\"type\":\"string\"},{\"name\":\"messagesDeliveryRate\",\"type\":\"double\"}]}"}

# 4. Load data from the database with kafka connect (sample.properties):
name=sample
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.schema.enabled=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.enabled=true

connection.url=jdbc:mysql://dbserver/data?serverTimezone=UTC
query=select countryCode, cast(messagesDeliveryRate as double) messagesDeliveryRate from message_summaries where acceptedDate >= '2019-09-01 00' and acceptedDate < '2019-09-01 01'
validate.not.null=false
topic.prefix=sample

connection.user=USERNAME
connection.password=PASSWORD

auto.create.topics.enable=false
value.converter.auto.register.schemas=false

# 5. Load the kafka connector: 'confluent local load sample -- -d sample.properties'

# 6. Error in connect/connect.stdout:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic sample :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: {"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"countryCode","type":["null","string"],"default":null},{"name":"messagesDeliveryRate","type":["null","double"],"default":null}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

Any ideas?

Thanks,
-Ande

Robin Moffatt

unread,
Nov 4, 2019, 10:10:12 AM11/4/19
to confluent...@googlegroups.com
Is there a reason you're pre-creating the schema, and not just letting Kafka Connect create it when it ingests the data? 


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/206daf2b-514f-4518-8bb8-c7a1102c4592%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages