NonRecordContainer cannot be cast to org.apache.avro.generic.IndexedRecord

475 views
Skip to first unread message

David

unread,
Jun 13, 2017, 9:47:50 AM6/13/17
to Confluent Platform
Hi,

so I'm a confluent newbie and this is what I'm trying to do:

Kafka topic on remote server encoded as schema-less JSON* => Confluent Kafka S3 Connector => convert to Avro => save to S3

*example: {"timestamp":"1497347711"}

So, basically I'm reading plain JSON from Kafka and I want to store the data as Avro in S3.

These are my configs:

etc/kafka-connect-s3/quickstart-s3.properties:

name=s3-sink
connector
.class=io.confluent.connect.s3.S3SinkConnector
tasks
.max=1
topics
=json-timestamp
s3
.region=eu-west-1
s3
.bucket.name=ccp-confluent-kafka-connect-s3-testing
s3
.part.size=5242880
flush
.size=3
storage
.class=io.confluent.connect.s3.storage.S3Storage
format
.class=io.confluent.connect.s3.format.avro.AvroFormat
schema
.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner
.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
key
.converter.schemas.enable=false
value
.converter.schemas.enable=false
schema
.ignore=true
schema
.compatibility=NONE

etc/schema-registry/connect-avro-standalone.properties:

key.converter=io.confluent.connect.avro.AvroConverter
key
.converter.schema.registry.url=http://localhost:8081
value
.converter=io.confluent.connect.avro.AvroConverter
value
.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset
.storage.file.filename=/tmp/connect.offsets

However, I'm getting an error (io.confluent.kafka.serializers.NonRecordContainer cannot be cast to org.apache.avro.generic.IndexedRecord):

bin/connect-standalone etc/schema-registry/connect-json-standalone.properties etc/kafka-connect-s3/quickstart-s3.properties:

[2017-06-13 12:19:38,959] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
org
.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to org.apache.avro.generic.IndexedRecord
        at org
.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
        at io
.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:77)
        at io
.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:328)
        at io
.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:191)
        at io
.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:163)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
        at org
.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
        at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
        at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:473)
        at java
.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java
.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to org.apache.avro.generic.IndexedRecord
        at org
.apache.avro.generic.GenericData.getField(GenericData.java:580)
        at org
.apache.avro.generic.GenericData.getField(GenericData.java:595)
        at org
.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
        at org
.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
        at org
.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
        at org
.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at org
.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
       
... 15 more


I assume, that the message should have a schema ID and that the schema should be registered in the schema registry, I just don't know how to do this, since I cannot find proper code samples. 
Any help is highly appreciated.

Best,
David

Reply all
Reply to author
Forward
0 new messages