bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
schemas.enable=false
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnectortasks.max=1topics=test_topic
s3.region=us-west-2s3.bucket.name=testing_buckets3.part.size=5242880flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storageformat.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGeneratorkey.converter.schemas.enable=falsepartitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitionerschema.compatibility=NONE
./bin/connect-standalone etc/kafka-connect-s3/connector1.properties etc/kafka-connect-s3/quickstart-s3.properties
[2017-07-07 17:36:41,457] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=falseinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
schemas.enable=false
## Copyright 2017 Confluent Inc.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.
name=s3-sinkconnector.class=io.confluent.connect.s3.S3SinkConnectortasks.max=1topics=test_topic
s3.region=us-west-2
s3.bucket.name=test_bucket
s3.part.size=5242880flush.size=3
storage.class=io.confluent.connect.s3.storage.S3Storageformat.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
key.converter.schemas.enable=false
value.converter.schemas.enable=false
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE