My requirement is to convert the json data in my kafka streams to parquet format in hdfs.
the configurations I am using, as suggested in various discussions is as follows:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
the format.class I am using in my properties in ParquetFormat: io.confluent.connect.hdfs.parquet.ParquetFormat
As suggested the data in my topics contains embedded schema information as well as payload as:
[ec2-user confluent-3.2.0]$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic json-parquet-test
{"payload": {"f1":"teststring"}, "schema": {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}}
{"payload": {"f1":"teststring1"}, "schema": {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}}
But I am getting this in my connect logs:
[2017-11-28 14:54:57,900] ERROR Task json-parquet-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:345) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2017-11-28 14:54:57,901] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
I am not sure what am I missing here. Appreciate any pointers on this.