Recursive data structure support in Connect SchemaBuilder

502 views
Skip to first unread message

John Hofman

unread,
Jun 23, 2016, 5:06:06 PM6/23/16
to Confluent Platform

Kafka connect consumers don’t handle records that use an avro schema with a recursive type. I have discovered this ticket that raises the same issue. It seems to be Connect issue, since consumers that are independent of Connect don’t fail. This is a blocking issue (I have a idea for a fix, and am working on it. Details below, advice appreciated).


Producing works fine:


bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type" : "record", "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}'
{"value":1,"next":null}
{"value":1,"next":{"list":{"value":2,"next":null}}}


Consuming without Connect also works:


./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning
{"value":1,"next":null}
{"value":1,"next":{"list":{"value":2,"next":null}}}
^CProcessed a total of 2 messages


But consuming with connect raises a StackOverflow. It seems the cyclic avro structure results in a endless loop of toConnectSchema
calls.

 
name=local-console-sink
connector
.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks
.max=1
topics
=test

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties connect-console-sink.properties  
start up logging
java
.lang.StackOverflowError
 at org
.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54)
 at org
.apache.avro.JsonProperties.getProp(JsonProperties.java:45)
 at io
.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055)
 at io
.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
 at io
.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)
 at io
.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
 at io
.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)


Questions


- Why is the avro being converted to ConnectSchema? Can that be avoided, if I want to translate the avro into something else (e.g Parquet) without going to ConnectSchema first?

- Are there any fixes for this already in the plan?


Possible solution


The ConnectSchema builder pattern doesn’t support recursive schema’s. The SchemaBuilder.build() call generates an Schema object with unmodifiable fields (which needs to reference itself, but can’t because its list is unmodifiable).


My approach would be to add a .field(String name, String type) method the the SchemaBuilder that adds a placeholder that needs to be resolved to a schema when .build() is called, if the symbols cannot be resolved the Schema is marked as incomplete (but still constructed so that other schemas can reference it to resolve their placeholders). Any record schema that contains a field of an incomplete schema type is also incomplete. When resolving placeholders the parent record can pass itself to the children so they can inspect it and its fields. This means the cast to unmodifiable would need to be delegated to the Schema constructor (its currently done in the builder before calling the constructor).


This would work internally for a directly recursive field (like the linked list), but would be more complicated for recursion with an intermediate record:


{“name”:”A”, type”:”record”, fields”:{“name”:”field_b”,”type”:”record”,”fields”:{“name”:”next_a”, type”:[“null”,”A”]}}}

Schema incompleteSchemaB = SchemaBuilder.struct().name(“B”).optional().field(“next_a”, A”).build(); //This won’t be able to resolve type “A”
Schema schemaA = SchemaBuilder.struct().name(“A”).field(“field_b”, incompleteSchemaB).build(); //When build is called here "A" will be resolved so the schema is complete


Once the SchemaBuilder supports recursion, the toConnectSchema() logic can be updated to parse avro without a stack overflow by using a placeholder the second time a name is used.


Thoughts?

John Hofman

unread,
Jun 24, 2016, 4:32:57 AM6/24/16
to Confluent Platform
On further inspection, the ConnectSchema is actually part of the kafka itself, so this goes deeper than I first thought. Was it a conscious decision not to support cyclic schema's? 
Reply all
Reply to author
Forward
0 new messages