Debezium Kafka Connector for MySQL-to-Redshift?

1,291 views
Skip to first unread message

amit...@venmo.com

unread,
Jun 14, 2018, 3:21:40 PM6/14/18
to debezium
How will that work considering that MySQL and Redshift (PostgreSQL) are different databases?
Is that a good idea considering that Redshift recommends to load data from s3 files using the COPY command?

Thanks,
Amit

Jiri Pechanec

unread,
Jun 15, 2018, 12:13:58 AM6/15/18
to debezium
Hi,

you have to verify that JDBC sink connector works with Redshift. If yes then this solution works and was already tested. Mind there are a couple of limitations - the biggest one probably that DELETE statements are not supported by sink.

Have a nice day

J.

Gunnar Morling

unread,
Jun 15, 2018, 2:24:17 AM6/15/18
to debe...@googlegroups.com
Amit,

We have an example for streaming changes from MySQL into Postgres here:


The key is to use the Unwrapping SMT as shown in the example.

As Jiri is saying, the question is whether the JDBC sink connector runs on Redshift, but I'd assume it does. As far as DELETEs are concerned, you might perhaps work with logical deletes (i.e. a deletion flag in the tables).

If you gain any specific experiences from using Redshift, please let us know, in case it makes sense to add something to the docs, do a blog post etc.

Cheers,

--Gunnar



--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/73a36029-9f92-4a66-a98e-febaa1b964e5%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

ibnu bay

unread,
Sep 16, 2018, 3:00:46 PM9/16/18
to debezium
I have some error when connecting to Redshift, with this example:
https://blog.insightdatascience.com/from-postgresql-to-redshift-with-kafka-connect-111c44954a6a

the configuration:
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": "1",
  "topics": "customers",
  "auto.create": "true",
  "connection.url": "jdbc:redshift:/url:5439/db?user=username&password=password",
  "insert.mode": "upsert"
}


Error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException at io.confluent.connect.jdbc.sink.dialect.GenericDialect.getCreateQuery(GenericDialect.java:33) at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:87) at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:62) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:66) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524) ... 10 more

Any idea? please... thanks...


On Friday, June 15, 2018 at 1:24:17 PM UTC+7, Gunnar Morling wrote:
Amit,

We have an example for streaming changes from MySQL into Postgres here:


The key is to use the Unwrapping SMT as shown in the example.

As Jiri is saying, the question is whether the JDBC sink connector runs on Redshift, but I'd assume it does. As far as DELETEs are concerned, you might perhaps work with logical deletes (i.e. a deletion flag in the tables).

If you gain any specific experiences from using Redshift, please let us know, in case it makes sense to add something to the docs, do a blog post etc.

Cheers,

--Gunnar


2018-06-15 6:13 GMT+02:00 Jiri Pechanec <jiri.p...@gmail.com>:
Hi,

you have to verify that JDBC sink connector works with Redshift. If yes then this solution works and was already tested. Mind there are a couple of limitations - the biggest one probably that DELETE statements are not supported by sink.

Have a nice day

J.

On Thursday, June 14, 2018 at 9:21:40 PM UTC+2, amit...@venmo.com wrote:
How will that work considering that MySQL and Redshift (PostgreSQL) are different databases?
Is that a good idea considering that Redshift recommends to load data from s3 files using the COPY command?

Thanks,
Amit

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

ibnu bay

unread,
Sep 16, 2018, 3:22:45 PM9/16/18
to debezium
Finally get my Redshift Sync
here my configuration:
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:redshift://url:5439/db?user=user&password=pass",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "false",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"

I think the problem is auto create table is must disable :)
Reply all
Reply to author
Forward
0 new messages