Kafka connect consumers don’t handle records that use an avro schema with a recursive type. I have discovered this ticket that raises the same issue. It seems to be Connect issue, since consumers that are independent of Connect don’t fail. This is a blocking issue (I have a idea for a fix, and am working on it. Details below, advice appreciated).
Producing works fine:
bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type" : "record", "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}'
{"value":1,"next":null}
{"value":1,"next":{"list":{"value":2,"next":null}}}Consuming without Connect also works:
./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning
{"value":1,"next":null}
{"value":1,"next":{"list":{"value":2,"next":null}}}
^CProcessed a total of 2 messagesBut consuming with connect raises a StackOverflow. It seems the cyclic avro structure results in a endless loop of toConnectSchema
calls.
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=test./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties connect-console-sink.properties
… start up logging …
java.lang.StackOverflowError
at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54)
at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)
Questions
- Why is the avro being converted to ConnectSchema? Can that be avoided, if I want to translate the avro into something else (e.g Parquet) without going to ConnectSchema first?
- Are there any fixes for this already in the plan?
Possible solution
The ConnectSchema builder pattern doesn’t support recursive schema’s. The SchemaBuilder.build() call generates an Schema object with unmodifiable fields (which needs to reference itself, but can’t because its list is unmodifiable).
My approach would be to add a .field(String name, String type) method the the SchemaBuilder that adds a placeholder that needs to be resolved to a schema when .build() is called, if the symbols cannot be resolved the Schema is marked as incomplete (but still constructed so that other schemas can reference it to resolve their placeholders). Any record schema that contains a field of an incomplete schema type is also incomplete. When resolving placeholders the parent record can pass itself to the children so they can inspect it and its fields. This means the cast to unmodifiable would need to be delegated to the Schema constructor (its currently done in the builder before calling the constructor).
This would work internally for a directly recursive field (like the linked list), but would be more complicated for recursion with an intermediate record:
{“name”:”A”, “type”:”record”, “fields”:{“name”:”field_b”,”type”:”record”,”fields”:{“name”:”next_a”, “type”:[“null”,”A”]}}}
Schema incompleteSchemaB = SchemaBuilder.struct().name(“B”).optional().field(“next_a”, “A”).build(); //This won’t be able to resolve type “A”
Schema schemaA = SchemaBuilder.struct().name(“A”).field(“field_b”, incompleteSchemaB).build(); //When build is called here "A" will be resolved so the schema is completeOnce the SchemaBuilder supports recursion, the toConnectSchema() logic can be updated to parse avro without a stack overflow by using a placeholder the second time a name is used.
Thoughts?