Hi ,
So I have in my connect-standalone.properties
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
My input to the Standalone connector is as :
{"schema":{"type":"struct","fields":[{"type":"long","optional":false,"field":"Date"},{"type":"string","optional":false,"field":"Type"},{"type":"long","optional":false,"field":"SymbolID"},{"type":"double","optional":false,"field":"SequenceID"},{"type":"string","optional":false,"field":"BuySell"},{"type":"double","optional":false,"field":"Volume"},{"type":"string","optional":false,"field":"Symbol"},{"type":"long","optional":false,"field":"Durationms"},{"type":"string","optional":false,"field":"TAttribute"} ],"optional":false,"name":"htest2"},"payload":{"Date":20150925, "Type":"F", "SymbolID":34200000000000, "SequenceID":1.1, "BuySell":"B", "Volume":100.00, "Symbol":"HAPE", "Durationms":49990, "Attribute":"ABCD"}}
However it still throws the following error as below:
[2017-08-19 20:51:22,888] ERROR Task con-0-21821-42 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
What is the correct format to send data then with key and value schema enabled?
Thank you,
Tushar