Writing nested json to s3 from kafka

275 views
Skip to first unread message

arsh...@gmail.com

unread,
Feb 21, 2018, 8:26:09 AM2/21/18
to Confluent Platform
I have json array in following manner:

[{"a":74,"b":1519202998533,"c":"Shipped","d":7318},{"a":11,"b":1519202998546,"c":"Shipped","d":40481}]


I have made topic ord_avro_multiple for which i have defined avro format as 

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic ord_avro_multiple --property value.schema='{"type":"array","name":"arrayjson","items":{"type":"record","name":"arraysjs","fields":[{"name":"a","type":"int"},{"name":"b","type":"long"},{"name":"c","type":"string"},{"name":"d","type":"int"}]}}'

It gave me error when i am trying to write it in s3 bucket using standalone-avro-connect as follows:

[2018-02-21 09:02:51,700] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class org.apache.kafka.connect.data.Struct and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.ArrayList[0])
        at io.confluent.connect.s3.format.json.JsonRecordWriterProvider$1.write(JsonRecordWriterProvider.java:81)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:402)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:204)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class org.apache.kafka.connect.data.Struct and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.ArrayList[0])
        at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:284)
        at com.fasterxml.jackson.databind.SerializerProvider.mappingException(SerializerProvider.java:1110)
        at com.fasterxml.jackson.databind.SerializerProvider.reportMappingProblem(SerializerProvider.java:1135)
        at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:69)
        at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:32)
        at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
        at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
        at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
        at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:292)
        at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2493)
        at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:378)
        at io.confluent.connect.s3.format.json.JsonRecordWriterProvider$1.write(JsonRecordWriterProvider.java:77)
        ... 14 more
[2018-02-21 09:02:51,704] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2018-02-21 09:02:51,705] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I have mentioned in kafka connect and kafka-s3-connect as default which is JsonSerializer.Kindly suggest which kind of serializer will be needed to parse it?

Miguel Silvestre

unread,
Nov 20, 2019, 3:55:14 AM11/20/19
to Confluent Platform
Hi,

You are writing avro to kafka topic and want to dump it to s3 right?
Try using just the format class

format.class=io.confluent.connect.s3.format.avro.AvroFormat

without any value.converter specified on your connector properties file.
Reply all
Reply to author
Forward
0 new messages