Need help with configs to setup S3 connector pump JSON traffic from broker to S3 bucket.

714 views
Skip to first unread message

dhawan.g...@datavisor.com

unread,
Jul 7, 2017, 8:59:35 PM7/7/17
to Confluent Platform
Hi All,

I am trying to get a s3-connector consume messages from my kafka topic in standalone-mode without using Schema Registry. I am running kafka-broker, zookeeper, and connector in my localhost.
The data is in JSON and has *NOT* been serialized using avro. Hence the need to disable any form of schema registry dependency.

Here are my worker configs:
bootstrap.servers=localhost:9092
key
.converter=org.apache.kafka.connect.json.JsonConverter
value
.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter


key
.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


# Local storage file for offset data
offset
.storage.file.filename=/tmp/connect.offsets
schemas
.enable=false

S3 configs: 
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=test_topic

s3.region=us-west-2
s3.bucket.name=testing_bucket
s3.part.size=5242880
flush.size=3

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat

schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
key.converter.schemas.enable=false
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE


I start the connector in standalone mode using the following command:
./bin/connect-standalone etc/kafka-connect-s3/connector1.properties  etc/kafka-connect-s3/quickstart-s3.properties

My Worker dies after startup with the following exception: 

[2017-07-07 17:36:41,457] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Please tell me what am I missing in my configs? I have spent some time reading the docs and got this far. 


Arun Mehta

unread,
Jul 7, 2017, 11:51:52 PM7/7/17
to Confluent Platform
I think including the namespace prefixes key.converter and value.converter might be the answer here
For example: key.converter.schema.enable=false and similarly for value.converter

dhawan.g...@datavisor.com

unread,
Jul 8, 2017, 1:08:26 PM7/8/17
to Confluent Platform
Thank you so much Arun that did the trick. 

I am posting my worker configs and S3 configs for posterity's sake here:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
schemas.enable=false

#
# Copyright 2017 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=test_topic

s3.region=us-west-2
s3.bucket.name=test_bucket
s3.part.size=5242880
flush.size=3

storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat

schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

key.converter.schemas.enable=false
value.converter.schemas.enable=false

partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

These are the configs that worked for me. Thank you Arun. I hope these configs get sanitized a bit further from the community. I am sure this is still has a redundancies. 
Reply all
Reply to author
Forward
0 new messages