Can Debezium PostgreSQL Connecter be used with JDBC Sink Connector?

2,020 views
Skip to first unread message

Jim Glennon

unread,
Sep 7, 2017, 12:23:16 PM9/7/17
to debezium, Zahid Iqbal
I ran a test where I used the Debezium PostgreSQL Connector (w/ the Confluent Avro Converter) to publish database changes and then tried using the Confluent JDBC Sink Connector to insert the changes into another DB. It seems that the two connectors cannot be used together in this manner. the following issues were observed:

- The topic name that Debezium publishes to is of the form serverName.schemaName.tableName. However, the JDBC Sink Connector is treating the serverName as part of the table name.


[2017-09-07 15:58:39,604] INFO Checking table:EDB.public.pgbench_accounts exists for product:PostgreSQL schema:public catalog: (io.confluent.connect.jdbc.sink.DbMetadataQueries:47)

[2017-09-07 15:58:39,666] INFO product:PostgreSQL schema:public catalog:postgres -- table:EDB.public.pgbench_accounts is absent (io.confluent.connect.jdbc.sink.DbMetadataQueries:51)



- It appears that the Kafka Connect JSON produced by the Debezium Connector is not compatible with what the JDBC Connector is expecting. The following error was encountered:

[2017-09-07 15:58:39,668] ERROR Task test-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)

org.apache.kafka.connect.errors.ConnectException: EDB.public.pgbench_accounts.Value (STRUCT) type doesn't have a mapping to the SQL database column type

at io.confluent.connect.jdbc.sink.dialect.DbDialect.getSqlType(DbDialect.java:179)

at io.confluent.connect.jdbc.sink.dialect.PostgreSqlDialect.getSqlType(PostgreSqlDialect.java:71)

at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnSpec(DbDialect.java:117)

at io.confluent.connect.jdbc.sink.dialect.DbDialect$2.apply(DbDialect.java:109)

at io.confluent.connect.jdbc.sink.dialect.DbDialect$2.apply(DbDialect.java:105)

at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:62)

at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:37)

at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnsSpec(DbDialect.java:105)

at io.confluent.connect.jdbc.sink.dialect.DbDialect.getCreateQuery(DbDialect.java:73)

at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:87)

at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:62)

at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)




Randall Hauch

unread,
Sep 7, 2017, 12:41:23 PM9/7/17
to debezium, Zahid Iqbal
The two connectors were not designed to work together, but you can use SMTs to make any changes you might need to the DBZ events (before they're written to Kafka) or in the sink connector after they're read from Kafka but before the JDBC sink connector gets them. Debezium's almost completed SMT to extract the "after" version of the row (https://issues.jboss.org/browse/DBZ-226) will be very useful, but you may need to also adjust the topic names.

Another potential concern is the data type representations.

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/12ee7334-ac65-4992-b81d-06a629f35c6d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jiri Pechanec

unread,
Sep 8, 2017, 4:17:58 AM9/8/17
to debezium
Please look at https://github.com/jpechane/debezium-utils/tree/master/sink-demo how the futurue SMT is intended to be used. Also notice RegexRouter transform that handles differences between toping naming of source and sink connectors.

J.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

Jim Glennon

unread,
Sep 8, 2017, 8:01:05 AM9/8/17
to debezium
Thanks. I will take a look. We are already using the RegexRouter with the Debezium PostgreSQL Connector to route messages to a single topic. However this has introduced other complications (as discussed here https://groups.google.com/forum/#!topic/confluent-platform/bdJHeGX3cOI) since we are also using Avro.

~jim

Jim Glennon

unread,
Sep 8, 2017, 11:43:01 AM9/8/17
to debezium
One question. I know that when using a transform on a source connector (e.g. Debezium PostgreSQL Connector) it is applied *before* the converter (e.g. Avro converter). Is this also true for a sink connector (e.g. JDBC Sink Connector), or is the transform applied *after* the converter?

Thanks.

~jim

On Friday, September 8, 2017 at 4:17:58 AM UTC-4, Jiri Pechanec wrote:

Randall Hauch

unread,
Sep 8, 2017, 12:15:43 PM9/8/17
to debezium
For a sink connector, a message is read from Kafka, converted into a SinkRecord, transformed using the entire chain, and then passed to the sink connector. This is done in batches.

To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Jim Glennon

unread,
Sep 14, 2017, 8:48:10 AM9/14/17
to debezium
I gave this a try and had partial success. With the JDBC Sink configured with insert.mode = insert, it worked. However with insert.mode = upsert, I encountered the following:

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "cities" ("name") VALUES ('Riverside') ON CONFLICT ("name") DO UPDATE SET  was aborted: ERROR: syntax error at end of input

  Position: 78  Call getNextException to see other errors in the batch.

at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)

at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2126)

at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:469)

at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:791)

at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1547)

at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:99)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

... 11 more


~jim

On Friday, September 8, 2017 at 4:17:58 AM UTC-4, Jiri Pechanec wrote:

Gunnar Morling

unread,
Sep 22, 2017, 10:57:26 AM9/22/17
to debezium
Hi Jim,

Does this issue still exist with Debezium 0.6 (released yesterday)?

If you still see that issue with upsert mode, could you try and extract some more details from the logs?

Jiri has been working on a demo which uses the new SMT and the Postgres sink connector:


This will be part of the official examples repo and docs, too (for now, just ignore the "Prerequisites" part in the readme.md, the stock 0.6 bits contain everything).

Thanks,

--Gunnar
Reply all
Reply to author
Forward
0 new messages