how to define kafka connect schema?

2,019 views
Skip to first unread message

changpe...@gmail.com

unread,
Aug 4, 2017, 5:02:32 PM8/4/17
to Confluent Platform
All the following steps are in kafka with some jars from confluent!
I just want to test the kafka connect for mysql sink using the following command:

bin/connect-standalone.sh config/connect-standalone.properties  config/connect-mysql-sink.properties

my connect-standalone.properties looks like this:

bootstrap.servers=KAFKA:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000


my connect-mysql-sink.properties looks like this:

name=mysql-sink-test
topics=mysql-students
tasks.max=1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:mysql://KAFKA:3306/test
connection.user=test
connection.password=123456
pk.mode=record_value
pk.fields=id
batch.size=3000
insert.mode=insert
auto.create=true
table.name.format=students
max.retries=10

note: io.confluent.connect.jdbc.JdbcSinkConnector  is what i download from confluent platform.

But when i execute the above command, errors occured:

[2017-08-05 02:52:20,607] ERROR Task mysql-sink-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'students' is RECORD_VALUE, but record value schema is missing
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:146)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2017-08-05 02:52:20,610] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2017-08-05 02:52:20,611] ERROR Task mysql-sink-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2017-08-05 02:52:20,612] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

actually, my students table structure is:

mysql> describe students;
+-------+--------------+------+-----+---------+-------+
| Field | Type         | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id    | int(11)      | NO   | PRI | NULL    |       |
| name  | varchar(256) | YES  |     | NULL    |       |
+-------+--------------+------+-----+---------+-------+

Maybe i was confused with the schema!
how to configure the input or output schema of kafka? or the convertor
  recognize the object automatic?

Konstantine Karantasis

unread,
Aug 15, 2017, 2:57:17 AM8/15/17
to confluent...@googlegroups.com

In order to use the JDBC sink connector, you'll have to use a converter that is aware of schemas (AvroConverter or JsonConverter with schemas enabled - above you seem to use JsonConverter with schemas disabled/false). 

You may want to first try running the quickstart guide, here: 


Konstantine


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bf3136c4-8c10-4e1a-bd56-a0b6f9d10588%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages