MySQL JDBC Sink

420 views
Skip to first unread message

Ehasnul Khan

unread,
Feb 12, 2019, 2:06:39 AM2/12/19
to debezium
Hi everyone,

Even though, I posted this question as part of a separate post I initiated, I thought I would create a new post as this is a new question. Here is what my issue is:

I am planning to use the same MySQL database to sink all CDC records from the source. After doing a some research, it looks as though the Debezium MySQL connector is basically a connector for the source database only. 

Can you please confirm that I could not use the same Debezium connector for SINK?  

I see some of you gave examples (in various demos) uses Kafka-Connect-JDBC-Sink connector. I found this example of configuration from the web that uses Confluence's Kafka-Connect-JDBC to sink.


 connection.url
{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

Would a modified configuration similar to above for Debezium MySQL work?

Also I noticed in the with the Kafka-connect JDBC connector that there is a PostGres.....jar file. Does that mean that this connector only works with a PostGres DB for sink?

Thanks again for your support!

-Ehsan

Jiri Pechanec

unread,
Feb 12, 2019, 2:11:24 AM2/12/19
to debezium
Hi,

Debezium is really only a set of source connectors. For sink you need to use the JDBC sink as you've already found out. The sink works also for MySQL (just do not forget to add the JDBC driver). The exmaples we have provided intentionally use different database so we can demonstrate replication accross different vendors. But MySQL -> MySQL works too as well.

J.

Ehasnul Khan

unread,
Feb 13, 2019, 2:22:10 AM2/13/19
to debezium
Here is my setup looks like (I did remove PostGres..jar as I am using mysql for sink).

jdbc setup 1.png





My Debezium and MySql Sink configurations is provided below:
Source:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d 
'{ 
"name": "classicmodels-connector", "config": 
  { 
"connector.class": "io.debezium.connector.mysql.MySqlConnector", 
"tasks.max": "1", 
"database.hostname": "localhost", 
"database.port": "3306", "database.user": 
"erprep", "database.password": "erprep", 
"database.server.id": "223344", 
"database.server.name": "mysql1", 
"database.whitelist": "classicmodels", 
"database.history.kafka.bootstrap.servers": 
"localhost:9092", 
"database.history.kafka.topic": "dbhistory.classicmodels" 
  } 
}'

Sink (I am not sure if I have provided my topic name properly):

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d 
'{ 
"name": "mysql_target_sink", "config": 
  {
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", 
  "tasks.max": "1",
  "topics": "classicmodels", 
  "connection.url": "jdbc:mysql://localhost/target_schema", 
  "connection.user": "erprep", 
  "connection.password": "erprep", 
  "auto.create": "true", 
  "name": "mysql_target_sink" 
  } 
  }'
 


source runs okay. but for sink, I am ending up with the following error message:

HTTP/1.1 500 Internal Server Error

Date: Wed, 13 Feb 2019 06:56:33 GMT

Content-Type: application/json

Content-Length: 2345

Server: Jetty(9.4.12.v20180830)


{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class io.debezium.connector.mysql.MySqlConnector, name='io.debezium.connector.mysql.MySqlConnector', version='0.8.3.Final', encodedVersion=0.8.3.Final, type=source, typeName='source', location='file:/Users/ekhan/project/ek/kafka-debezium-mysql-prototype/kafka-connect/connectors/debezium-connector-mysql/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.1.0', encodedVersion=2.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.1.0', encodedVersion=2.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.1.0', encodedVersion=2.1.0, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.1.0', encodedVersion=2.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.1.0', encodedVersion=2.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.1.0', encodedVersion=2.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.1.0', encodedVersion=2.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.1.0', encodedVersion=2.1.0, type=source, typeName='source', location='classpath'}"}


Please note that, I only have one "connect.distributed.properties" worker file. Do I need another one for sink? I didn't think so.


Also, is using Avro Schema a must for sink?


Thanks much, Jiri. Eagerly waiting for a response! :-)


-Ehsan



Jiri Pechanec

unread,
Feb 13, 2019, 3:04:51 AM2/13/19
to debezium
Hi,

what is your setting for `plugin.path` config option?

Avro is not mandatory for using the sink connector.

J.

Robin Moffatt

unread,
Feb 13, 2019, 6:43:37 AM2/13/19
to debe...@googlegroups.com
Where are you running Kafka Connect from (which distribution), and how are you running it (docker etc?) 

Your sink connector is failing because: 

Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector

So you need to either install the JDBC Sink (https://www.confluent.io/connector/kafka-connect-jdbc/), or just run Kafka Connect as shipped with Confluent Platform which includes the JDBC Sink by default. There are also Docker images etc too. 


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff



--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/478ae527-9d20-4e3e-ad54-b3875845bc02%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ehsanul Khan

unread,
Feb 13, 2019, 11:05:07 AM2/13/19
to debe...@googlegroups.com
MY plugin.path is pointing to "Connector" under which .jar files are located (under each connector folders)- basically under the same location where Debezium Mysql connector is located (which works).

# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=connectors
Thanks!

-Ehsan
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.


--

Ehsanul khan

unread,
Feb 13, 2019, 11:20:11 AM2/13/19
to debe...@googlegroups.com
Thanks, Robin! 
I will circle back with more info and also look into the installation.

But basically I am kafka and k-connect running on my mac machine in distributed mode. 

Will post more detail....

Ehsan

Sent from my iPhone

On Feb 13, 2019, at 3:42 AM, Robin Moffatt <ro...@confluent.io> wrote:

Where are you running Kafka Connect from (which distribution), and how are you running it (docker etc?) 

Your sink connector is failing because: 

Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector

So you need to either install the JDBC Sink (https://www.confluent.io/connector/kafka-connect-jdbc/), or just run Kafka Connect as shipped with Confluent Platform which includes the JDBC Sink by default. There are also Docker images etc too. 


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff



On Wed, 13 Feb 2019 at 08:22, Ehasnul Khan <ehsanul...@gmail.com> wrote:
Here is my setup looks like (I did remove PostGres..jar as I am using mysql for sink).

<jdbc setup 1.png>

Robin Moffatt

unread,
Feb 13, 2019, 4:39:45 PM2/13/19
to debe...@googlegroups.com
You have to install the JDBC Sink, either explicitly (e.g. via Confluent Hub), or by using Confluent Platform which has it already. 
The Debezium connector docker image does not include it AFAIK


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff


Gunnar Morling

unread,
Feb 13, 2019, 5:11:07 PM2/13/19
to debezium
> The Debezium connector docker image does not include it AFAIK

Indeed the JDBC sink isn't part of Debezium's Kafka Connect image.

You can easily add it though by deriving your own image from that one as shown in this example:


See the README for more information and the Dockerfile which adds the JDBC sink connector:

    https://github.com/debezium/debezium-examples/tree/master/unwrap-smt/debezium-jdbc-es/Dockerfile

Hth,,

--Gunnar

Ehsanul Khan

unread,
Feb 14, 2019, 11:04:22 AM2/14/19
to debe...@googlegroups.com
Thanks Robin and Gunner for the input.

I will use your suggestions and let you know know. I did see Gunnar's set for Mysql to PostGres. Also creating a separate configuration for JDBC Sink is noted. 

Thanks again!

-Ehsan


For more options, visit https://groups.google.com/d/optout.


--
Message has been deleted
Message has been deleted

Robin Moffatt

unread,
Feb 18, 2019, 4:28:15 AM2/18/19
to debe...@googlegroups.com
Looking at your Connect worker log, the JDBC Sink is failing because: 

 INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:93)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Caused by: java.net.ConnectException: Connection refused (Connection refused)

i.e. it cannot connect to the mysql database. 

From the log I can see that Debezium is connecting to : 

[2019-02-17 10:56:25,015] INFO Successfully tested connection for jdbc:mysql://localhost:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull with user 'erprep' (io.debezium.connector.mysql.MySqlConnector:101)

but you've configured your sink to connect to MySQL on a different port (3306 vs 3036): 

[2019-02-17 10:56:43,072] INFO JdbcSinkConfig values: 
connection.url = jdbc:mysql://localhost:3036/tgt_classicmodels

So unless you have MySQL on a different port, that's why you're getting the Sink failing with "java.net.ConnectException: Connection refused"


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff



On Sun, 17 Feb 2019 at 19:34, Ehasnul Khan <ehsanul...@gmail.com> wrote:
I was able to get Kafka-jdbc-sink connector running but unable that to talk to MySql database. I tried to exclusively setup (as Jiri suggested) Kafka-jdbc-connector but ended up creating a separate folder under the "connectors" directory (plugin.path=connectors ). 

Here is the excerpt from the log file:

INFO Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:93)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
...
2019-02-17 10:57:03,280] ERROR WorkerSinkTask{id=mysql_target_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
org.apache.kafka.connect.errors.ConnectException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
...
[2019-02-17 10:57:03,281] ERROR WorkerSinkTask{id=mysql_target_sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
...
[2019-02-17 10:57:03,282] ERROR WorkerSinkTask{id=mysql_target_sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-02-17 10:57:03,282] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:104)

Here is the curl command to start the Kafka-connect-jdbc for MySQL:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d 
'{ "name": "mysql_target_sink", "config": 
  { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", 
  "topics": "mysql1.classicmodels.payments", 
  "connection.url":"jdbc:mysql://localhost:3036/tgt_classicmodels", 
  "connection.user":"erpsink", "connection.password":"erpsink", 
  "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", 
  "pk.mode":"record_value", "pk.fields":"ID",  "auto.create": "true", "insert.mode":"upsert", 
  "pk.fields": "id", "pk.mode": "record_value", "name": "mysql_target_sink" 
  } 
  }'

Setup snapshot from IntelliJ and POM.xml:

setup snapshot.png




Please note: I am running everything on my local machine (not using Docker or Confluent Platform, as you can see :-) )

Attached is the full ERROR_LOG file.

Question:

1. Does POM needs to have an entry for Confluent JDBC Sink?
2. Is my Curl config looks okay?

Also, my source and target database schema is under the same MySQL Instance (running on my local machine).

I have assigned full DBA rights to the connection.user.

Please let me know your suggestion of where I should be looking at or what I have done wrong here.

THANKS SO MUCH!!!

-Ehsan

Ehsanul khan

unread,
Feb 19, 2019, 3:04:01 AM2/19/19
to debe...@googlegroups.com
Hi Robin,

Thanks for pointing out the port mismatch. Yes jdbc. Sink talks to the db now. I am very close to be sinking data in MySql- i am getting the table created at the target db. But having issue with multiple PK value entries. Therefore not sinking. 

I will report after I troubleshoot the issue. 

Thanks again!!

Ehsan

Sent from my iPhone

Ehasnul Khan

unread,
Feb 20, 2019, 12:29:26 AM2/20/19
to debezium
Hello again-

I was able to get most of the data into the target table!!!!!!!!!! Thanks for all the help!!!!!!!

Only issue I am facing right now is a table with text column is not being sinked in the target database. I am getting the following error message:

[2019-02-19 21:14:11,720] ERROR WorkerSinkTask{id=mysql_tgt_productlines-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)

org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Data truncation: Data too long for column 'textDescription' at row 1

com.mysql.jdbc.MysqlDataTruncation: Data truncation: Data too long for column 'textDescription' at row 1



Here is the table DDL:

CREATE TABLE `productlines` (

  `productLine` varchar(50) NOT NULL,

  `textDescription` varchar(4000) DEFAULT NULL,

  `htmlDescription` mediumtext,

  `image` mediumblob,

  PRIMARY KEY (`productLine`)


I am using the following configuration for MySQL Sink.

"name": "mysql_tgt_productlines", "config": 
  { 
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", 
  "connection.url":"jdbc:mysql://localhost:3306/tgt_classicmodels", 
  "connection.user":"erprep", "connection.password":"erprep", 
  "transforms": "unwrap", 
  "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", 
  "auto.create": "true", 
  "insert.mode":"upsert", 
  "topics":"productlines", 
  "pk.fields":"productLine", 
  "pk.mode":"record_key", 
  "name": "mysql_tgt_productlines" 
  } 
  }'


Thanks again. You guys amazing!!!!


-Ehsan

To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/a688d231-8ed2-4943-9e09-178712e6c6ae%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/a751d7cf-157a-4e7d-bd1d-fde5facaa7a3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

Ehasnul Khan

unread,
Feb 20, 2019, 1:07:53 AM2/20/19
to debezium
Also, something interesting I noticed. Schema definition at the source and target are not the same. For example, here is the source and target schema for Product table:

Source Table:
CREATE TABLE `products` (
  `productCode` varchar(15) NOT NULL,
  `productName` varchar(70) NOT NULL,
  `productLine` varchar(50) NOT NULL,
  `productScale` varchar(10) NOT NULL,
  `productVendor` varchar(50) NOT NULL,
  `productDescription` text NOT NULL,
  `quantityInStock` smallint(6) NOT NULL,
  `buyPrice` decimal(10,2) NOT NULL,
  `MSRP` decimal(10,2) NOT NULL,
  PRIMARY KEY (`productCode`),
  KEY `productLine` (`productLine`),
  CONSTRAINT `products_ibfk_1` FOREIGN KEY (`productLine`) REFERENCES `productlines` (`productLine`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;


Target Schema:
CREATE TABLE `products` (
  `productLine` varchar(256) NOT NULL,
  `quantityInStock` smallint(6) NOT NULL,
  `buyPrice` decimal(65,2) NOT NULL,
  `MSRP` decimal(65,2) NOT NULL,
  `productCode` varchar(256) NOT NULL,
  `productScale` varchar(256) NOT NULL,
  `productName` varchar(256) NOT NULL,
  `productVendor` varchar(256) NOT NULL,
  `productDescription` varchar(256) NOT NULL,
  PRIMARY KEY (`productCode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;



Here is my Debezium Configuration:

  curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
  { 
  "name": "mysql-source-connector", "config": 

  { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", 
  "database.hostname": "localhost", "database.port": "3306", "database.user": "erprep", 
  "database.password": "erprep", "database.server.id": "223344", "database.server.name": "mysql1", 
  "database.whitelist": "classicmodels", "database.history.kafka.bootstrap.servers": "localhost:9092", 
  "database.history.kafka.topic": "schema-changes.classicmodels", "transforms": "route","transforms.route.type": 
  "org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", 
  "transforms.route.replacement": "$3"
  }
}'


Please suggest what I need to do in order to keep the source and target table structure exactly the same?


Thanks!

-Ehsan

Jiri Pechanec

unread,
Feb 20, 2019, 3:03:08 AM2/20/19
to debezium
Hi,

I am afraid this is a known issue

You need to create the table manually not by the sink connector if you want to have correct metadata.

J.


--

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

Gunnar Morling

unread,
Feb 20, 2019, 3:28:26 AM2/20/19
to debezium
Note you can use the "column.propagate.source.type" option to have change events contain the column's type and length as schema parameters. You can use that information to create the columns on the sink side with the correct width.

--Gunnar

Ehsanul khan

unread,
Feb 20, 2019, 10:58:34 AM2/20/19
to debe...@googlegroups.com
Thanks Jiri!!

Yes- I should be able to do that. 

Thanks!!

Ehsan



Sent from my iPhone

Ehsanul khan

unread,
Feb 20, 2019, 11:00:34 AM2/20/19
to debe...@googlegroups.com
Just seeing your message, Gunnar.

Great- I will try this.

Thanks so much!!

Ehsan

Sent from my iPhone

Ehasnul Khan

unread,
Feb 21, 2019, 1:01:01 AM2/21/19
to debezium
Hi Gunnar,

I have searched the configuration for column.propagate.source.type but could not find an example or info on how to set it up....so I am guessing. Would this be done for column by column? or I just have to put type. For example:

{
..
"column.propagate.source.type":"text"
...
}

Thanks!

-Ehsan

Jiri Pechanec

unread,
Feb 21, 2019, 2:07:04 AM2/21/19
to debezium
Hi,

please check `column.propagate.source.type` config option at https://debezium.io/docs/connectors/mysql/

J.
Reply all
Reply to author
Forward
Message has been deleted
0 new messages