Should schema name be always same as the topic name?

3,826 vistas
Ir al primer mensaje no leído

George @paytm.com

no leída,
20 jun 2016, 5:05:42 p.m.20/6/16
para Confluent Platform
Hi,

We find that if the schema/subject name is different from the topic name. Kakfa-connect and Camus run will fail with the following stack trace

[2016-06-20 18:05:11,823] ERROR Thread WorkerSinkTask-hdfs-sink-9 exiting with uncaught exception:  (org.apache.kafka.connect.util.ShutdownableThread:84)

org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:

    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)

    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 41

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401

    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:157)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:174)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:202)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:193)

    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:187)

    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:67)

    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:134)

    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:148)

    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:191)

    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)

    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)

    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)

    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)


Even though schema registry shows that the schema id is not 41 at all.


Is this intended behaviour? Is there way to get around it?


Thanks,


George



joh...@hotmail.com

no leída,
20 jun 2016, 11:35:32 p.m.20/6/16
para Confluent Platform
Hi
I encounter the same problem.
The source code get subject name by method "getSubjectName(topic, isKey)".
so, the subject name of schema have to be "topicName-value" or "topicName-key".
Why not get subject name from scham,  since have got schema by global id.

在 2016年6月21日星期二 UTC+8上午5:05:42,George @paytm.com写道:
Se borró el mensaje

George @paytm.com

no leída,
21 jun 2016, 12:00:52 p.m.21/6/16
para Confluent Platform

Hi John,


Thanks for the helpful info!


It turns out it is caused by this line


https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L148


i.e., it looks up schema version by subject name, which IS coupled with the topic name. The comment also suggests that this coupling affects only Kafka connect, which explains why our processing jobs are fine, even if the topic does not match the schema/subject name.


To bypass this problem, we will have an additional processing step to forward our result topic to a topic of the matching schema name, and then persist that "matching name" topic. Alternatively, we can persist directly with our processing framework.

George @paytm.com

no leída,
22 jun 2016, 10:02:51 a.m.22/6/16
para Confluent Platform
Did more research. The issue is logged as

George @paytm.com

no leída,
22 jun 2016, 11:41:37 a.m.22/6/16
para Confluent Platform
For the record, our workaround is to use ReplayLogProducer to pull data into a topic with the matching subject name, and then persist that topic into HDFS with camus. 
Responder a todos
Responder al autor
Reenviar
0 mensajes nuevos