Topic still remained after removing table from `table.include.list` in DBZ 1.9.1

1,200 views
Skip to first unread message

Brandon Hsu

unread,
Jun 1, 2022, 11:53:55 AM6/1/22
to debe...@googlegroups.com
Hello, All,

I recently test Debezium 1.9.1 for MS SQL Server on Apache Kafka 3.1.0.

I test Debezium on removing specified included table from synchronization by updating `table.include.list` of a registered task through Kafka Connect `PUT` method.

I use the following DBZ registration config.

[kafka@kafkatest ~]$ cat sqlserver-dbz-connector.json

{

  "name": "dbz-mssql", 

  "config": {

    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 

    "errors.tolerance": "all",

    "errors.log.enable":"true",

    "errors.log.include.messages":"true",

    "database.hostname": "mssqldev", 

    "database.port": "1433", 

    "database.user": "sa", 

    "database.password": "pwD123", 

    "database.dbname": "cdctest", 

    "table.include.list": "cdcschema.test1, cdcschema.test2", 

    "database.server.name": "ms_sql_cdc", 

    "decimal.handling.mode":"precise", 

    "time.precision.mode": "connect", 

    "include.schema.changes": "false", 

    "truncate.handling.mode": "skip",

    "database.history.kafka.bootstrap.servers": "kafkatest:9092", 

    "database.history.kafka.topic": "ms_sql_cdc.ddl_history",

    "transforms": "unwrap", 

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", 

    "transforms.unwrap.drop.tombstones": "false",

    "transforms.unwrap.delete.handling.mode": "drop"

    }

}

[kafka@kafkatest ~]$ 


And use the following `curl` commands for registering and check managed topics.

[kafka@kafkatest ~]$ curl -v -X POST http://kafkatest:8083/connectors/ \

  -H "Accept:application/json" \

  -H 'Content-Type:application/json' \

  -d @$PWD/sqlserver-dbz-connector.json

[kafka@kafkatest ~]$  

[kafka@kafkatest ~]$ curl -X GET http://kafkatest:8083/connectors/dbz-mssql/topics | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    84  100    84    0     0  21000      0 --:--:-- --:--:-- --:--:-- 21000
{
  "dbz-mssql": {
    "topics": [
      "cdcschema.test1",
      "cdcschema.test2"
    ]
  }
}
[kafka@kafkatest ~]$ 


When I use the following `jq` command and `PUT` method to update `table.include.list`,
But I find that the managed topic doesn't change accordingly.

[kafka@kafkatest ~]$ jq '.config | ."table.include.list" = "cdcschema.test1"' $PWD/sqlserver-dbz-connector.json | \

  curl -v -X PUT http://kafkatest:8083/connectors/dbz-mssql/config/ \

  -H "Accept:application/json" \

  -H 'Content-Type:application/json' \

  --data-binary @- 

[kafka@kafkatest ~]$ 

[kafka@kafkatest ~]$ curl -X GET http://kafkatest:8083/connectors/dbz-mssql/topics | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    84  100    84    0     0  21000      0 --:--:-- --:--:-- --:--:-- 21000
{
  "dbz-mssql": {
    "topics": [
      "ms_sql_cdc.cdcschema.test1",
      "ms_sql_cdc.cdcschema.test2"
    ]
  }
}
[kafka@kafkatest ~]$ 


It seems that the topic `ms_sql_cdc.cdcschema.test2` does not receive data change event,
but the topic is still managed by the DBZ connector task.

What I expect is that as I remove a table from `table.include.list`, the table can be removed from DBZ synchronizing list completely.

Is there anything wrong, or I misunderstand DBZ behavior correctly?

Best Regards.

----------------------------------------------------------------------------------------------------------------------------------------------------
本信件(包括任何附件)為機密信件。若您並非指定收信人,嚴禁複製、傳送、公開或使用其所載的全部或部份資料,任何對於本信件未經授權之使用或散播可能帶有法律責任。
若您偶然獲得此郵件,請回信或立即以電話通知寄件人並且從您的電腦系統中刪除本信件及所有附件,謝謝。
The information in this e-mail may be confidential; it is intended for use solely by the individual or entity named as the recipient hereof. Disclosure, copying, distribution, or use of the contents of this e-mail by persons other than the intended recipient may violate applicable laws and if you have received this e-mail in error, please delete the original message and notify us by collect call immediately. Thank you.

----------------------------------------------------------------------------------------------------------------------------------------------------

Chris Cranford

unread,
Jun 2, 2022, 12:51:02 PM6/2/22
to debe...@googlegroups.com, Brandon Hsu
Hi Brandon -

So this isn't something that's directly controlled by Debezium.  The "/topics" end-point returns a list of topics that has been be used since the connector was first created.  So when you remove a table from the "table.include.list", this list won't automatically be adjusted.  In order to adjust this list, you will need to reset the topic list by using the "/topics/reset" end-point which will request that the topic list be reset to the current active topics being used by the connector.  You can find more information about the end-points in the Kafka Connect documentation [1].

Hope that helps,
CC

[1]: https://kafka.apache.org/documentation/#connect_rest
---------------------------------------------------------------------------------------------------------------------------------------------------- --
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/CA%2BJYH_0BHCHwDqxNsfYAnbYOBiaw7uvEm55OhHWChZCyWbaY9Q%40mail.gmail.com.

CCHsu

unread,
Jun 13, 2022, 1:26:50 AM6/13/22
to debezium
Hi, Chris,

Thanks for your reply.

It seems that the "/topics" end-point only displays what have been touched by the task.

I actually try to find a way to completely clear a specified table's Debezium CDC config and perform table snapshot again.

Currently I observe that Debezium checks for existing previous offset for specified topics,
and it seems that the only way to clear removed table configuration from Debezium is to change to a new connector task name.

I can not find any way to clear "offset" for a specified Debezium CDC table.


For completely new connector task "dbz-mssql-test"
[2022-06-07 15:59:31,599] INFO [dbz-mssql-test|task-0] Context created (io.debezium.pipeline.ChangeEventSourceCoordinator:106)
[2022-06-07 15:59:31,599] INFO [dbz-mssql-test|task-0] No previous offset has been found (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:69)
[2022-06-07 15:59:31,599] INFO [dbz-mssql-test|task-0] According to the connector configuration both schema and data will be snapshotted (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:71)
[2022-06-07 15:59:31,599] INFO [dbz-mssql-test|task-0] Snapshot step 1 - Preparing (io.debezium.relational.RelationalSnapshotChangeEventSource:87)
[2022-06-07 15:59:31,645] INFO [dbz-mssql-test|task-0] Snapshot step 2 - Determining captured tables (io.debezium.relational.RelationalSnapshotChangeEventSource:96)
...(skipped)
[2022-06-07 15:59:32,109] INFO [dbz-mssql-test|task-0] Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=ms_sql_cdc, changeLsn=NULL, commitLsn=00000160:00006260:0001, eventSerialNo=null, snapshot=FALSE, sourceTime=2022-06-07T15:59:32.107Z], snapshotCompleted=true, eventSerialNo=1]] (io.debezium.pipeline.ChangeEventSourceCoordinator:156)



After remove a table from and re-add it again
[2022-06-07 16:28:31,154] INFO [dbz-mssql-test|task-0] A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted. (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:64)
[2022-06-07 16:28:31,154] INFO [dbz-mssql-test|task-0] Snapshot ended with SnapshotResult [status=SKIPPED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=ms_sql_cdc, changeLsn=NULL, commitLsn=00000160:00006260:0001, eventSerialNo=null, snapshot=FALSE, sourceTime=null], snapshotCompleted=true, eventSerialNo=0]] (io.debezium.pipeline.ChangeEventSourceCoordinator:156)






Chris Cranford 在 2022年6月3日 星期五上午12:51:02 [UTC+8] 的信中寫道:

Chris Cranford

unread,
Jun 13, 2022, 1:29:46 PM6/13/22
to debe...@googlegroups.com, CCHsu
Hi,

If you want to change the "table.include.list" to remove a table, you can do this by using a PUT request to "/connectors/{name}/config".  When you do this, Kafka Connect will trigger a re-balance, stopping the connector and restarting it with the new configuration. 

Chris

CCHsu

unread,
Jun 16, 2022, 11:07:14 AM6/16/22
to debezium
Hi, Chris, 

Thanks for your information.  😊

After this adjustment, is all related offset information about the table cleared from the task?

Or does these offset information about tables removed from "table.include.list" be kept indefinitely until the task is completely deleted by "curl -X DELETE"?

I hope to know whether I should perform any kind of housekeeping action or not after I remove any table from this list.

Chris Cranford 在 2022年6月14日 星期二凌晨1:29:46 [UTC+8] 的信中寫道:

Chris Cranford

unread,
Jun 21, 2022, 7:43:21 AM6/21/22
to debe...@googlegroups.com, CCHsu
Hi,

If you are referring to the data returned by the "/topics" end-point, then I really do not know.  It isn't an endpoint I, nor we, discuss or use much as it's really there for informational purposes and doesn't have any direct use on connector operations. 

If you are referring to the offsets stored in the offset topic, then the removal of the tables from the include/exclude lists really do not impact that data either.  Offsets are related to the source in whole, not in part, and the contents of them are managed directly by the connector and do not require any manual adjustments.

Chris
Reply all
Reply to author
Forward
0 new messages