Got "Error retrieving Avro schema for id" after the reboot of Schema registry

7,933 views
Skip to first unread message

gvdm

unread,
Nov 10, 2017, 10:22:44 AM11/10/17
to Confluent Platform
Hi all,

I have a Kafka/Confluent 3.3.0 stack made up of a Kafka broker (0.11, the Confluent one), Schema registry and a Kafka Streams application I wrote. I have all the agents running on the same machine and only one instance per type (one broker, one registry, one streams app) for test purposes.

I configured the broker, registry and streams apps as Linux systemd services that I manage via systemctl.

I also successfully wrote some data on the kafka topic using a Java kafka producer and I use the Kafka Streams library to correctly read from a topic named TEST-TRACE, process and write the data on a second topic named TEST-TRACE_STATISTICS.

It's been a while that the stack is working but suddenly I noticed that no data was processed by the Kafka Streams application. Reading the streams logs I found a sudden crash

2017-11-10 09:51:33 INFO  KafkaStreams:229 - stream-client [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9] State transition from PENDING_SHUTDOWN to NOT_RUNNING.
2017-11-10 09:51:33 INFO  KafkaStreams:514 - stream-client [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9] Stopped Kafka Streams process.
2017-11-10 09:51:33 INFO  StreamThread:980 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
2017-11-10 09:51:33 INFO  StreamThread:1072 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Stream thread shutdown complete
2017-11-10 09:51:33 INFO  StreamThread:1421 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Removing all standby tasks []
2017-11-10 09:51:33 INFO  StreamThread:1407 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Removing all active tasks [0_0, 1_0, 2_0, 3_0]
2017-11-10 09:51:32 INFO  KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2017-11-10 09:51:28 INFO  StreamThread:1040 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Shutting down
2017-11-10 09:51:28 INFO  StreamThread:571 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Shutting down at user request
2017-11-10 09:51:23 INFO  StreamThread:980 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN.
2017-11-10 09:51:23 INFO  StreamThread:900 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Informed thread to shut down
2017-11-10 09:51:22 INFO  KafkaStreams:229 - stream-client [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9] State transition from RUNNING to PENDING_SHUTDOWN.
Stopping Test Kafka Streams...
2017-11-10 09:49:57 INFO  AbstractCoordinator:597 - Discovered coordinator 192.168.0.101:9092 (id: 2147483647 rack: null) for group grp-kafka-streams.
2017-11-10 09:49:43 INFO  AbstractCoordinator:642 - Marking the coordinator 192.168.0.101:9092 (id: 2147483647 rack: null) dead for group grp-kafka-streams
2017-10-17 00:35:35 WARN  NetworkClient:844 - Error while fetching metadata with correlation id 9 : {TEST-TRACE_STATISTICS=LEADER_NOT_AVAILABLE}
current standby tasks
: []
current active tasks
: [0_0, 1_0, 2_0, 3_0]


As you can see on the 10th of October I got the last log and then today the Streams app crashed.
Digging a bit I found that at the same time there was a crash of the Schema registry

Nov 10 09:51:17 192.168.0.101 systemd[1]: schema-registry.service failed.
Nov 10 09:51:17 192.168.0.101 systemd[1]: Unit schema-registry.service entered failed state.
Nov 10 09:51:16 192.168.0.101 systemd[1]: schema-registry.service: main process exited, code=killed, status=9/KILL
Nov 10 09:46:58 192.168.0.101 schema-registry-start[6668]: [2017-11-10 09:46:57,941] INFO 10.17.36.213 - - [10/Nov/2017:09:46:57 +0100] "POST /subjects/TEST-TRACE-value/versions HTTP/1.1" 200 10  2 (io.confluent.rest-utils.requests:77)
Nov 10 09:46:57 192.168.0.101 schema-registry-start[6668]: [2017-11-10 09:46:57,928] INFO 10.17.36.213 - - [10/Nov/2017:09:46:57 +0100] "POST /subjects/TEST-TRACE-value/versions HTTP/1.1" 200 10  185 (io.confluent.rest-utils.requests:77)

As you can see at 09:46:58 I had the last successful operation with the registry and at 09:51:16 the schema registry crashed (with no error furthermore) and then at 09:51:22 the streams application stops.

And here it comes the problem...

After restarting the Schema RegistryI try to restart the Streams application but it fails with this exception

2017-11-10 16:10:25 INFO  KafkaStreams:229 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] State transition from PENDING_SHUTDOWN to NOT_RUNNING.
2017-11-10 16:10:25 INFO  KafkaStreams:514 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] Stopped Kafka Streams process.
2017-11-10 16:10:25 WARN  StreamThread:978 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Unexpected state transition from DEAD to PENDING_SHUTDOWN.
2017-11-10 16:10:25 INFO  StreamThread:900 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Informed thread to shut down
2017-11-10 16:10:25 INFO  KafkaStreams:229 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] State transition from RUNNING to PENDING_SHUTDOWN.
at org
.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
at org
.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org
.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org
.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org
.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org
.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org
.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:36)
at org
.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:52)
at org
.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org
.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at io
.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
at io
.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
at io
.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io
.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
at io
.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io
.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
at io
.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
at io
.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
at io
.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 221
at org
.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
at org
.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org
.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org
.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org
.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org
.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org
.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38)
org
.apache.kafka.streams.errors.StreamsException: Failed to deserialize key for record. topic=TEST-TRACE, partition=0, offset=2547800
2017-11-10 16:10:25 WARN  StreamThread:978 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Unexpected state transition from RUNNING to DEAD.
2017-11-10 16:10:25 INFO  StreamThread:1072 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Stream thread shutdown complete
2017-11-10 16:10:25 INFO  StreamThread:1421 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Removing all standby tasks []
2017-11-10 16:10:25 INFO  StreamThread:1407 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Removing all active tasks [0_0, 1_0, 2_0, 3_0]
2017-11-10 16:10:25 INFO  KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2017-11-10 16:10:25 INFO  StreamThread:1040 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Shutting down
current standby tasks
: []
current active tasks
: [0_0, 1_0, 2_0, 3_0]
2017-11-10 16:10:25 INFO  StreamThread:193 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] partition assignment took 1813 ms.
2017-11-10 16:10:25 INFO  KafkaStreams:229 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] State transition from REBALANCING to RUNNING.
2017-11-10 16:10:25 INFO  StreamThread:980 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] State transition from ASSIGNING_PARTITIONS to RUNNING.

There is an error SerializationException: Error retrieving Avro schema for id 221.
It seems that the schema registry crush caused the schema with ID 221 to not being saved or it is not available on reboot. I checked also the URL http://10.17.36.101:8081/schemas/ids/221 but I got the same response

{
 error_code
: 40403,
 message
: "Schema not found"
}

Do you know of known bugs of the registry in such a situation? And how can I now fix my data?
At the moment I'm stuck as I cannot process the data written on the Kafka topic.

Regards
Giulio

gvdm

unread,
Nov 13, 2017, 5:50:28 AM11/13/17
to Confluent Platform
Up

mag...@confluent.io

unread,
Nov 13, 2017, 5:31:31 PM11/13/17
to Confluent Platform
Hi,

It would help if you provide some more details. How was the schema published. Was the schema published around the time the crash happened?

gvdm

unread,
Nov 14, 2017, 3:49:56 AM11/14/17
to Confluent Platform
Hi,

the producer generates the Schema dynamically according to  the data structure. Infact the data structure of the value written in Kafka may change, adding or removing fields, so I wrote a method which creates the JSON structure starting from the Class definition of the value.

// This is a cache map of schemas, used to not issue a schema for each write operation (which then brings the issue https://github.com/confluentinc/kafka-rest/issues/217)
Map<JSONObject, Schema> registeredSchemas = new HashMap<>();
// This is a map containing the data payload
Map<String, Object> payload = new HashMap<>();

// The JSON of the value Schema
JSONObject resultJsonObj = new JSONObject();
resultJsonObj
.put("type", "record");
resultJsonObj
.put("name", "KafkaTracerValue");
JSONArray fields = new JSONArray();
for (Entry<String, Object> payloadEntry : payload.entrySet()) {
 
if (payloadEntry.getValue() != null) {
 
Class<?> valueClass = payloadEntry.getValue().getClass();
 
// This utility creates a JSON structure recursively (using Reflection) starting from the Class definition of the valueClass
 fields
.put(ObjectsTransformUtil.transformToAvroJSONObject(payloadEntry.getKey(), valueClass, "KafkaTracerValue"));
 
}
}
resultJsonObj
.put("fields", fields);


// Here we check if the schema was already created and cached locally.
// If not, we add it, else we reuse the previously registered schema
boolean foundSchema = false;
for (Entry<JSONObject, Schema> schemaEntry : registeredSchemas.entrySet()) {
 
if (resultJsonObj.similar(schemaEntry.getKey())) {
 resultJsonObj
= schemaEntry.getKey();
 foundSchema
= true;
 
break;
 
}
}
if (!foundSchema) {
 
Parser p = new Parser();
 registeredSchemas
.put(resultJsonObj, p.parse(resultJsonObj.toString()));
}
Schema valueSchema = registeredSchemas.get(resultJsonObj);

The key schema instead if a simply two-fields schema.

Then we send the data using the generated schemas

GenericRecord key = new GenericData.Record(keySchema);
key
.put("correlationId", correlationId);
key
.put("traceId", traceId);

// This method will populate the GenericRecord schema with the current data of the payload
GenericRecord value = toValueGenericRecord(valueSchema, payload);

// Create the ProducerRecord and send it
ProducerRecord<GenericRecord, GenericRecord> record = new ProducerRecord<>(topic, key, value);
getProducer
().send(record, (RecordMetadata metadata, Exception e) -> {
 
if (e != null) {
 LOG
.error(e.getMessage(), e);
 
}
});


So, the Schema is not recreated each time we send the data but it is cached in a Map in the Producer application.
Does it help? Is this implementation correct?

gvdm

unread,
Nov 14, 2017, 4:01:51 AM11/14/17
to Confluent Platform
In addition, the data can change but not so frequently, so I think that at the time the schema registry went down no new schema was created


Il giorno venerdì 10 novembre 2017 16:22:44 UTC+1, gvdm ha scritto:

mag...@confluent.io

unread,
Nov 15, 2017, 2:03:37 AM11/15/17
to Confluent Platform
All the schemas are stored in a topic in Kafka, so I see now reason for schema to be go away unless someone manually published a message to the topic with a null value ( key equivalent to schema that was lost) and it was compacted. Other thing I can think of is your requested the schema before the SR read all schemas from the topic. Have you tried to get the schema again?

gvdm

unread,
Nov 15, 2017, 3:10:32 AM11/15/17
to Confluent Platform
The only publisher at the moment is the producer so I think that this is not the case of a null value published.

Yes, I tried requiring the schema to the registry (and so does the kafka-streams app every time I reboot it) but nothing is found

gvdm

unread,
Nov 15, 2017, 4:19:49 AM11/15/17
to Confluent Platform
I add a configuration detail. The _schemas topic has cleanup.policy set tot compact

> kafka-configs  --zookeeper localhost:2181  --entity-type topics --entity-name _schemas --describe
Configs for topic '_schemas' are cleanup.policy=compact

Is this right?

gvdm

unread,
Nov 15, 2017, 4:23:33 AM11/15/17
to Confluent Platform
..and, if I try to perform a read operation via the  kafka-avro-console-consumer I get the same error

> kafka-avro-console-consumer  --bootstrap-server localhost:9092 --from-beginning --topic TEST-TRACE

org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 222

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

        at io
.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
        at io
.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
        at io
.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
        at io
.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
        at io
.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.                                                                                                                                           java:65)
        at io
.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131                                                                                                                                           )
        at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io
.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
        at io
.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
        at kafka
.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
        at kafka
.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
        at kafka
.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka
.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-11-15 10:21:57,077] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org
.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 222

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

        at io
.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
        at io
.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
        at io
.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
        at io
.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
        at io
.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.                                                                                                                                           java:65)
        at io
.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131                                                                                                                                           )
        at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io
.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
        at io
.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
        at kafka
.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
        at kafka
.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
        at kafka
.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka
.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Magesh Nandakumar

unread,
Nov 15, 2017, 11:52:41 AM11/15/17
to confluent...@googlegroups.com
You can try inspecting the contents of schemas topic using a console consumer.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/0L-kbkkoAbo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/0083a121-4512-4b62-9c52-b1d4ce03f343%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

gvdm

unread,
Nov 15, 2017, 3:17:24 PM11/15/17
to Confluent Platform
I tried with the kafka-avro-console-consumer
This is the output I got


> kafka-avro-console-consumer  --bootstrap-server localhost:9092 --from-beginning --topic _schemas
null
null
Processed a total of 3 messages
[2017-11-15 20:20:18,089] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org
.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2017-11-15 20:20:18,089] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org
.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
To post to this group, send email to confluent...@googlegroups.com.

gvdm

unread,
Nov 16, 2017, 3:18:14 AM11/16/17
to Confluent Platform
I was stuck on this error for so long time so I had to fix the problem in the wrong but unique way I know.
This is what I did:
  1. Any producer was stopped
  2. Deleted all the topics on the Kafka broker with the script

    for i in $(kafka-topics --zookeeper localhost:2181 --list); do  kafka-topics --zookeeper localhost:2181 --delete --topic $i; done

  3. Stopped the Zookeeper/Kafka/Confluent stack and deleted all the temporary files and log files:

    rm -rf /tmp/zookeeper/
    rm
    -rf /var/lib/kafka/
    rm
    -rf /tmp/kafka-streams/

    This deleted the Kafka log files, the zookeeper logs and the streams data store

  4. Restarted the Confluent stack and the producers. 
Obviously everything now works as expected, but this is the third time I get this same exception in the same circumstances and the only fix I could do was/is the "hard reset" of the stack.

I think this issue will come back in the immediate future and I don't know if I will be allowed to hard reset kafka again, because in the future we will not be the managers of the producers (they will be installed for some customers).

I hope that someone will take care of this issue in Confluent, as this is the main wall in using the Confluent stack widely

Thanks
Reply all
Reply to author
Forward
0 new messages