Kafka Connect - Nested Struct

317 views
Skip to first unread message

Saravanan Tirugnanum

unread,
Jun 2, 2016, 5:37:45 PM6/2/16
to Confluent Platform
Is Nested Struct supported in org.apache.kafka.connect.data.Struct . Please confirm.

rootStruct.getStruct("fieldName") returns Null...

Regards
Saravanan


Saravanan Tirugnanum

unread,
Jun 2, 2016, 6:41:00 PM6/2/16
to Confluent Platform
I have digged through the APIs and found that this is possible. However , i wanted to extract the embedded nested schema from the top schema. Is that possible or do we need to pass all nested schema's separately for constructing Struct.

Regards
Saravanan

Saravanan Tirugnanum

unread,
Jun 3, 2016, 7:26:09 AM6/3/16
to Confluent Platform
I was able to figure out this as well... found a way to extract the nested schema for constructing the struct using - schema.field("<name>").schema()

Got stuck in further steps with the error below.. Any clue why AvroData is looking for SchemaBuilder.parameters that being optional..

java.lang.NullPointerException
at java.util.Collections$UnmodifiableMap.<init>(Collections.java:1446)
at java.util.Collections.unmodifiableMap(Collections.java:1433)
at org.apache.kafka.connect.data.SchemaBuilder.parameters(SchemaBuilder.java:184)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:603)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:651)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:588)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:651)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:588)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:651)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:588)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:266)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:142)
at org.apache.kafka.connect.runtime.WorkerSourceTask.access$600(WorkerSourceTask.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask$WorkerSourceTaskThread.execute(WorkerSourceTask.java:356)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

On Thursday, June 2, 2016 at 4:37:45 PM UTC-5, Saravanan Tirugnanum wrote:

Saravanan Tirugnanum

unread,
Jun 4, 2016, 3:08:40 PM6/4/16
to Confluent Platform
Ah !! I noticed this issue got fixed in 0.10 release. 
https://issues.apache.org/jira/browse/KAFKA-3690

However , the workaround given in the link doesnt seem to work.. i have disabled the internal key converter schemas as false but still get this error.
Reply all
Reply to author
Forward
0 new messages