Sending records with schema using JsonConverter

65 views
Skip to first unread message

Tushar Sudhakar Jee

unread,
Aug 19, 2017, 11:57:26 PM8/19/17
to Confluent Platform
Hi ,

So I have in my connect-standalone.properties

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true


My input to the Standalone connector is as :

{"schema":{"type":"struct","fields":[{"type":"long","optional":false,"field":"Date"},{"type":"string","optional":false,"field":"Type"},{"type":"long","optional":false,"field":"SymbolID"},{"type":"double","optional":false,"field":"SequenceID"},{"type":"string","optional":false,"field":"BuySell"},{"type":"double","optional":false,"field":"Volume"},{"type":"string","optional":false,"field":"Symbol"},{"type":"long","optional":false,"field":"Durationms"},{"type":"string","optional":false,"field":"TAttribute"} ],"optional":false,"name":"htest2"},"payload":{"Date":20150925, "Type":"F", "SymbolID":34200000000000, "SequenceID":1.1, "BuySell":"B", "Volume":100.00, "Symbol":"HAPE", "Durationms":49990, "Attribute":"ABCD"}}


However it still throws the following error as below:



[2017-08-19 20:51:22,888] ERROR Task con-0-21821-42 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields

at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)




What is the correct format to send data then with key and value schema enabled?



Thank you,
Tushar

Konstantine Karantasis

unread,
Aug 20, 2017, 6:08:11 PM8/20/17
to confluent...@googlegroups.com
The issue is that you have enabled schemas for the keys. 

I assume it will work if you set: 

key.converter.schemas.enable=false

- Konstantine

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/c5d49bbc-ad73-4307-8ecb-e33c63ea341f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages