Exception while consuming bottledwater messages from Sink connector

45 views
Skip to first unread message

Unmesh Joshi

unread,
May 7, 2016, 6:03:02 AM5/7/16
to Confluent Platform
Hi,

I was trying to write a sink connector to consume bottledwater produced messages. I have configured worker with value.converter=io.confluent.connect.avro.AvroConverter and value.converter.schema.registry.url=http://localhost:8081
I can correctly see messages using ./kafka-avro-console-consumer. as following 

{"address_id":{"int":53},"address":{"string":"125 city center"},"address2":{"string":"main street"},"district":{"string":"ma"},"city":{"string":"waltham"},"postal_code":{"string":"211002"},"phone":{"string":"781-989-9999"},"last_update":{"com.martinkl.bottledwater.datatypes.DateTime":{"year":2016,"month":5,"day":7,"hour":14,"minute":56,"second":45,"micro":473555}}}

I get following exception from AvroConverter.

org.apache.kafka.connect.errors.DataException: Avro union types containing null are only supported as optional fields and should have exactly two entries.
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1133)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1015)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:781)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)


any clues?

Thanks,
Unmesh

Unmesh Joshi

unread,
May 7, 2016, 8:20:06 AM5/7/16
to Confluent Platform
It looks like its failing for timestamp field in my schema, which is mapped to com.martinkl.bottledwater.datatypes.DateTime in Avro schema definition and failing check for schema.getTypes().size() == 2 in AvroData. For now I will remove timestamp field from my schema. 
Reply all
Reply to author
Forward
0 new messages