I started two sink connectors for two different topics using the same schema(same schema id) and caught the following exception. I verified the schema is registered under the versions under both subjects. Any one experienced the same issue and has some workaround? This is issue 343 posted in Github.
This issue really restricts our capability to use kafka connect with schema registry. There are many cases we will have different topics using the same schema.
[2016-05-31 13:54:00,780] ERROR Thread WorkerSinkTask-t531t-connector-1 exiting with uncaught exception: (org.apache.kafka.connect.util.Shutdown
ableThread:84)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 601
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:157)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:174)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:202)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:193)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:187)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:67)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:134)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:148)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:191)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)