debezium source mysql, delete record, only get ‘null’, and the jdbc mysql connector cannot recogniz.

2,849 views
Skip to first unread message

18321...@163.com

unread,
Sep 13, 2018, 11:07:15 PM9/13/18
to debezium
hi everybody,

   I'm testing kafka connnector, the data flow as following:
  
   > mysql database --> debezium connnector --> kafka topic with avro schema --> jdbc connector --> mysql database.
   
  but, and my debezium is using ""transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope" for source connector.
  to this configuration, the debezium only generate "null" topic mesage, then the jdbc connector sink cannot consume it, anyone has an idea?   thanks very mush.

  The following is the detail configuration and "null" messag sample.

## sample for source mysql delete record, get the "null" message, but jdbc sink cannot consume it.
....
{
  "country_id": 134,
  "country": "134",
  "new_column4": {
    "string": "134"
  }
}
null {##note: here is the ‘null’ message to socue mysql table delete a record.}
{
  "country_id": 134,
...


 ## debezium connnector configuration:
 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://10.7.112.7:8083/connectors -d '{
"name": "mysql-connector-flattened",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "10.7.112.18",
"database.port": "3306",
"database.user": "debezium",
"database.password": "****",
"database.server.id": "42",
"database.server.name": "flattened",
"database.history.kafka.bootstrap.servers": "10.7.112.7:9092",
"database.history.kafka.topic": "dbhistory.flattened" ,
"include.schema.changes": "true" ,
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false"
}
}'


Best Regards,

Jiri Pechanec

unread,
Sep 14, 2018, 12:18:32 AM9/14/18
to debezium
Hi,

 this is exacxtly the problem: `transforms.unwrap.drop.tombstones=false`. Please look at transformation documentation. You should keep both `drop.tombstones` and `drop.deletes` set to fals as JDBC sink connector is not able to process it.

J.

18321...@163.com

unread,
Sep 14, 2018, 3:11:45 AM9/14/18
to debezium
Hi,

 thanks for you reply, it's helpful,  and i review the jdbc-connector sink github infor, it's not support delete 'tombstone' event message now.
 thanks very much,Jiri. 

Best regards,

在 2018年9月14日星期五 UTC+8下午12:18:32,Jiri Pechanec写道:

Sanat Talwar

unread,
Apr 11, 2019, 3:54:32 AM4/11/19
to debezium
Hey,
I have done the same for my debezium and insert update all working finr but delete not working.
Any clue on that?

Jiri Pechanec

unread,
Apr 11, 2019, 11:37:53 PM4/11/19
to debezium
What error do you see?

Sanat Talwar

unread,
Apr 12, 2019, 12:33:33 AM4/12/19
to debe...@googlegroups.com

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"}],"optional":false,"name":"db_test_wal17.public.account.Key"},"payload":{"user_id":21}}


This is my debezium event generated in topic when row is deleted in which primary key id user_id.

But JDBc sink not updating the sink table.

As per discussion I have :-



curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "4", "topics": "wal2json.public.account", "connection.url": "", "behavior.on.null.values": "delete", "auto.create": "true", "auto.evolve": "true", "fields.whitelist": "user_id,username,password", "delete.enabled": "true", "insert.mode": "upsert", "pk.fields": "user_id", "pk.mode": "record_key" } }' 


This is my jdbc sink configuration.



And error i got is:- 

Null pointer exception

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:135)

at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:71)

at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:139)

at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/db3be3df-c60e-4583-8ebd-bf769533ec7d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Jiri Pechanec

unread,
Apr 12, 2019, 1:09:13 AM4/12/19
to debezium
What version of JDBC sink do you use?

J.
To unsubscribe from this group and stop receiving emails from it, send an email to debe...@googlegroups.com.

Sanat Talwar

unread,
Apr 12, 2019, 2:31:45 AM4/12/19
to debe...@googlegroups.com
Hey,

postgresql-9.4-1206-jdbc41.jar


This jar I am using for sink.


To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Jiri Pechanec

unread,
Apr 12, 2019, 2:38:01 AM4/12/19
to debezium
That's JDBC driver but I am asking for a version of Connect JDBC plugin.

J.

Sanat Talwar

unread,
Apr 12, 2019, 2:55:21 AM4/12/19
to debe...@googlegroups.com
Hey,

confluent-hub-client-5.0.1.jar




To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Sanat Talwar

unread,
Apr 12, 2019, 2:57:46 AM4/12/19
to debe...@googlegroups.com
Hey,

kafka-connect-jdbc-5.2.1.jar 

This is kafka connect jdbc jar that i am using.


On Fri, Apr 12, 2019 at 12:08 PM Jiri Pechanec <jiri.p...@gmail.com> wrote:
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Sanat Talwar

unread,
Apr 12, 2019, 3:40:27 AM4/12/19
to debe...@googlegroups.com
Anything wrong with the version?

Jiri Pechanec

unread,
Apr 12, 2019, 5:10:20 AM4/12/19
to debezium
Hi,

deletes are still not supported in JDBC connector - https://github.com/confluentinc/kafka-connect-jdbc/issues/127


J.

Sanat Talwar

unread,
Apr 12, 2019, 5:15:57 AM4/12/19
to debe...@googlegroups.com
So what alternative should we opt for?

To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Sanat Talwar

unread,
Apr 12, 2019, 5:19:38 AM4/12/19
to debe...@googlegroups.com
I read in someposts that now delete is provided.
Can you enlighten me? What should i do to make delete happen

To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Gunnar Morling

unread,
Apr 12, 2019, 5:54:35 AM4/12/19
to debezium
AFAIK, the JDBC sink connector still doesn't handle deletes/tombstones. One workaround is to use the flattening SMT and its delete.handling.mode option (https://debezium.io/docs/configuration/event-flattening/#configuration_options). This lets you rewrite deletes as updates of a logical deleted marker column in the sink tables, which you then can use to physically delete marked records with some kind of batch job.

Sanat Talwar

unread,
Apr 12, 2019, 6:06:34 AM4/12/19
to debe...@googlegroups.com
Hey,
This is my debezium configuration.

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector1", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "4", "database.hostname": "", "database.port": "5432", "database.user": "", "database.password": "", "database.dbname": "", "database.server.name": "", "plugin.name": "wal2json", "database.whitelist": "", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope", "transformed.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.drop.deletes": "false", "database.history.kafka.bootstrap.servers": "localhost: 9092", "database.history.kafka.topic": "dbhistory.wal2json" } }'


I am using SMT event flattening and response i got for deleted event is :-

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"}],"optional":false,"name":"db_test_wal17.public.account.Key"},"payload":{"user_id":21}

This response I got in kafka topic .

Now what should i do?

To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Sanat Talwar

unread,
Apr 12, 2019, 6:12:02 AM4/12/19
to debe...@googlegroups.com
And with delete.handling.mode set to rewrite _delete column not getting added.
Can you please enlighten where i am making mistakes.

Jiri Pechanec

unread,
Apr 18, 2019, 2:44:28 AM4/18/19
to debezium
Hi,

`transformed.unwrap.delete.handling.mode` is wrong name should be `transforms.unwrap.delete.handling.mode`

Value for `transforms.unwrap.drop.tombstones` should be true and `transforms.unwrap.drop.deletes` should not be present

J.

Reply all
Reply to author
Forward
0 new messages