Should schema name be always same as the topic name?

3,837 views
Skip to first unread message

George @paytm.com

unread,
Jun 20, 2016, 5:05:42 PM6/20/16
to Confluent Platform
Hi,

We find that if the schema/subject name is different from the topic name. Kakfa-connect and Camus run will fail with the following stack trace

[2016-06-20 18:05:11,823] ERROR Thread WorkerSinkTask-hdfs-sink-9 exiting with uncaught exception:  (org.apache.kafka.connect.util.ShutdownableThread:84)

org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:

    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)

    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 41

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401

    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:157)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:174)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:202)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:193)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:187)

    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:67)

    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:134)

    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:148)

    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:191)

    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)

    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)

    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)


Even though schema registry shows that the schema id is not 41 at all.


Is this intended behaviour? Is there way to get around it?


Thanks,


George



joh...@hotmail.com

unread,
Jun 20, 2016, 11:35:32 PM6/20/16
to Confluent Platform
Hi
I encounter the same problem.
The source code get subject name by method "getSubjectName(topic, isKey)".
so, the subject name of schema have to be "topicName-value" or "topicName-key".
Why not get subject name from scham,  since have got schema by global id.

在 2016年6月21日星期二 UTC+8上午5:05:42,George @paytm.com写道:
Message has been deleted

George @paytm.com

unread,
Jun 21, 2016, 12:00:52 PM6/21/16
to Confluent Platform

Hi John,


Thanks for the helpful info!


It turns out it is caused by this line


https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L148


i.e., it looks up schema version by subject name, which IS coupled with the topic name. The comment also suggests that this coupling affects only Kafka connect, which explains why our processing jobs are fine, even if the topic does not match the schema/subject name.


To bypass this problem, we will have an additional processing step to forward our result topic to a topic of the matching schema name, and then persist that "matching name" topic. Alternatively, we can persist directly with our processing framework.

George @paytm.com

unread,
Jun 22, 2016, 10:02:51 AM6/22/16
to Confluent Platform
Did more research. The issue is logged as

George @paytm.com

unread,
Jun 22, 2016, 11:41:37 AM6/22/16
to Confluent Platform
For the record, our workaround is to use ReplayLogProducer to pull data into a topic with the matching subject name, and then persist that topic into HDFS with camus. 
Reply all
Reply to author
Forward
0 new messages