Hi all,
I have a Kafka/Confluent 3.3.0 stack made up of a Kafka broker (0.11, the Confluent one), Schema registry and a Kafka Streams application I wrote. I have all the agents running on the same machine and only one instance per type (one broker, one registry, one streams app) for test purposes.
I configured the broker, registry and streams apps as Linux systemd services that I manage via systemctl.
I also successfully wrote some data on the kafka topic using a Java kafka producer and I use the Kafka Streams library to correctly read from a topic named TEST-TRACE, process and write the data on a second topic named TEST-TRACE_STATISTICS.
It's been a while that the stack is working but suddenly I noticed that no data was processed by the Kafka Streams application. Reading the streams logs I found a sudden crash
2017-11-10 09:51:33 INFO KafkaStreams:229 - stream-client [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9] State transition from PENDING_SHUTDOWN to NOT_RUNNING.
2017-11-10 09:51:33 INFO KafkaStreams:514 - stream-client [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9] Stopped Kafka Streams process.
2017-11-10 09:51:33 INFO StreamThread:980 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD.
2017-11-10 09:51:33 INFO StreamThread:1072 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Stream thread shutdown complete
2017-11-10 09:51:33 INFO StreamThread:1421 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Removing all standby tasks []
2017-11-10 09:51:33 INFO StreamThread:1407 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Removing all active tasks [0_0, 1_0, 2_0, 3_0]
2017-11-10 09:51:32 INFO KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2017-11-10 09:51:28 INFO StreamThread:1040 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Shutting down
2017-11-10 09:51:28 INFO StreamThread:571 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Shutting down at user request
2017-11-10 09:51:23 INFO StreamThread:980 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN.
2017-11-10 09:51:23 INFO StreamThread:900 - stream-thread [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9-StreamThread-1] Informed thread to shut down
2017-11-10 09:51:22 INFO KafkaStreams:229 - stream-client [test-kafka-streams-14530417-ea54-4457-831f-2dab900e32e9] State transition from RUNNING to PENDING_SHUTDOWN.
Stopping Test Kafka Streams...
2017-11-10 09:49:57 INFO AbstractCoordinator:597 - Discovered coordinator 192.168.0.101:9092 (id: 2147483647 rack: null) for group grp-kafka-streams.
2017-11-10 09:49:43 INFO AbstractCoordinator:642 - Marking the coordinator 192.168.0.101:9092 (id: 2147483647 rack: null) dead for group grp-kafka-streams
2017-10-17 00:35:35 WARN NetworkClient:844 - Error while fetching metadata with correlation id 9 : {TEST-TRACE_STATISTICS=LEADER_NOT_AVAILABLE}
current standby tasks: []
current active tasks: [0_0, 1_0, 2_0, 3_0]
As you can see on the 10th of October I got the last log and then today the Streams app crashed.
Digging a bit I found that at the same time there was a crash of the Schema registry
Nov 10 09:51:17 192.168.0.101 systemd[1]: schema-registry.service failed.
Nov 10 09:51:17 192.168.0.101 systemd[1]: Unit schema-registry.service entered failed state.
Nov 10 09:51:16 192.168.0.101 systemd[1]: schema-registry.service: main process exited, code=killed, status=9/KILL
Nov 10 09:46:58 192.168.0.101 schema-registry-start[6668]: [2017-11-10 09:46:57,941] INFO 10.17.36.213 - - [10/Nov/2017:09:46:57 +0100] "POST /subjects/TEST-TRACE-value/versions HTTP/1.1" 200 10 2 (io.confluent.rest-utils.requests:77)
Nov 10 09:46:57 192.168.0.101 schema-registry-start[6668]: [2017-11-10 09:46:57,928] INFO 10.17.36.213 - - [10/Nov/2017:09:46:57 +0100] "POST /subjects/TEST-TRACE-value/versions HTTP/1.1" 200 10 185 (io.confluent.rest-utils.requests:77)
As you can see at 09:46:58 I had the last successful operation with the registry and at 09:51:16 the schema registry crashed (with no error furthermore) and then at 09:51:22 the streams application stops.
And here it comes the problem...
After restarting the Schema RegistryI try to restart the Streams application but it fails with this exception
2017-11-10 16:10:25 INFO KafkaStreams:229 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] State transition from PENDING_SHUTDOWN to NOT_RUNNING.
2017-11-10 16:10:25 INFO KafkaStreams:514 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] Stopped Kafka Streams process.
2017-11-10 16:10:25 WARN StreamThread:978 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Unexpected state transition from DEAD to PENDING_SHUTDOWN.
2017-11-10 16:10:25 INFO StreamThread:900 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Informed thread to shut down
2017-11-10 16:10:25 INFO KafkaStreams:229 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] State transition from RUNNING to PENDING_SHUTDOWN.
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:36)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:52)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 221
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38)
org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key for record. topic=TEST-TRACE, partition=0, offset=2547800
2017-11-10 16:10:25 WARN StreamThread:978 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Unexpected state transition from RUNNING to DEAD.
2017-11-10 16:10:25 INFO StreamThread:1072 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Stream thread shutdown complete
2017-11-10 16:10:25 INFO StreamThread:1421 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Removing all standby tasks []
2017-11-10 16:10:25 INFO StreamThread:1407 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Removing all active tasks [0_0, 1_0, 2_0, 3_0]
2017-11-10 16:10:25 INFO KafkaProducer:972 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2017-11-10 16:10:25 INFO StreamThread:1040 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] Shutting down
current standby tasks: []
current active tasks: [0_0, 1_0, 2_0, 3_0]
2017-11-10 16:10:25 INFO StreamThread:193 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] partition assignment took 1813 ms.
2017-11-10 16:10:25 INFO KafkaStreams:229 - stream-client [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6] State transition from REBALANCING to RUNNING.
2017-11-10 16:10:25 INFO StreamThread:980 - stream-thread [test-kafka-streams-ee6a5440-a2dd-4cf3-9881-0e6be94652e6-StreamThread-1] State transition from ASSIGNING_PARTITIONS to RUNNING.
There is an error SerializationException: Error retrieving Avro schema for id 221.
It seems that the schema registry crush caused the schema with ID 221 to not being saved or it is not available on reboot. I checked also the URL
http://10.17.36.101:8081/schemas/ids/221 but I got the same response
{
error_code: 40403,
message: "Schema not found"
}
Do you know of known bugs of the registry in such a situation? And how can I now fix my data?
At the moment I'm stuck as I cannot process the data written on the Kafka topic.
Regards
Giulio