Hi,
I want to use HDFS connector with
org.apache.kafka.connect.json.JsonConverter converter, but I've faced an issue with both Parquet and Avro formatter.
Here are worker configs:
bootstrap.servers=localhost:9092
rest.port=4002
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000when I set them to true and do the following:
key.converter.schemas.enable=true
value.converter.schemas.enable=true
and produce records using Console Connector with default settings in the following format:{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"halo"}
{"schema":{"type":"string","optional":false},"payload":"salut"}
then I catch an exception:
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:64)
at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:487)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)
...
Caused by: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:213)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:208)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:76)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
In case of format format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
still no success:
[2016-08-19 18:43:15,400] ERROR Task is being killed and will not recover until manually restarted: (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
java.lang.IllegalArgumentException: Avro schema must be a record.
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:89)
at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:103)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider.getRecordWriter(ParquetRecordWriterProvider.java:51)
at io.confluent.connect.hdfs.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:415)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:486)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Even if I try to produce records like this:
{"schema":{"type":"record","optional":false, "fields":[{"name": "name", "type": "string"}]},"payload":"hello"}
What am I doing wrong?
Thanks,
Grigoriy