Kafka Connect Json to Parquet Convertion Issue

176 views
Skip to first unread message

Ashish Sachdeva

unread,
Nov 28, 2017, 1:36:15 PM11/28/17
to Confluent Platform
My requirement is to convert the json data in my kafka streams to parquet format in hdfs. 

the configurations I am using, as suggested in various discussions is as follows:

Connect distributed file:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true


internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false



the format.class I am using in my properties in ParquetFormat: io.confluent.connect.hdfs.parquet.ParquetFormat


As suggested the data in my topics contains embedded schema information as well as payload as:

[ec2-user confluent-3.2.0]$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic json-parquet-test 
{"payload": {"f1":"teststring"}, "schema": {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}}
{"payload": {"f1":"teststring1"}, "schema": {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}}



But I am getting this in my connect logs: 

[2017-11-28 14:54:57,900] ERROR Task json-parquet-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:345) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2017-11-28 14:54:57,901] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)


 I am not sure what am I missing here. Appreciate any pointers on this. 

- Ashish
Reply all
Reply to author
Forward
0 new messages