[2016-06-20 18:05:11,823] ERROR Thread WorkerSinkTask-hdfs-sink-9 exiting with uncaught exception: (org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 41
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:157)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:174)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:202)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:193)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:67)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:134)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:148)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:191)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Even though schema registry shows that the schema id is not 41 at all.
Is this intended behaviour? Is there way to get around it?
Thanks,
George
Hi John,
Thanks for the helpful info!
It turns out it is caused by this line
i.e., it looks up schema version by subject name, which IS coupled with the topic name. The comment also suggests that this coupling affects only Kafka connect, which explains why our processing jobs are fine, even if the topic does not match the schema/subject name.