Re: [debezium] Unknown schema issue when updating the table list for mysql debezium connector

70 views
Skip to first unread message
Message has been deleted

Chris Cranford

unread,
Aug 28, 2025, 12:49:07 PMAug 28
to debe...@googlegroups.com
Hi -

I believe you may have wanted to use `snapshot.mode` set to `recovery` (if you're on Debezium 2.7 or later) or `schema_only_recovery` (if you're on Debezium 2.6 or older). 

-cc

On 8/28/25 3:15 AM, Rohit Panwar wrote:

I have created a debezium connector with the following configurations

{ "name": "mysql-debezium-connector-20", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "snapshot.locking.mode": "none", "include.schema.changes": "false", "schema.history.internal.store.only.captured.tables.ddl": "true", "skip.messages.with.unknown.ddl": "true", "tombstones.on.delete": "true", "decimal.handling.mode": "string", "binary.handling.mode": "base64", "database.allowPublicKeyRetrieval": "true", "time.precision.mode": "adaptive_time_microseconds", "event.processing.failure.handling.mode": "warn", "heartbeat.interval.ms": "10000", "heartbeat.action.query": "SELECT 1", "database.hostname": "*******", "database.port": "*****", "database.user": "*******", "database.password": "*******", "database.server.id": "210009", "database.server.name": "aurora_cluster_210009", "database.include.list": "rscdc1_mxradon", "database.ssl.mode": "required", "table.include.list": "rscdc1_mxradon.Account_Base,rscdc1_mxradon.Prospect_Base,rscdc1_mxradon.Prospect_ExtensionBase,rscdc1_mxradon.ProspectSystem_ExtensionBase,rscdc1_mxradon.ProspectConversionPropensity_Base,rscdc1_mxradon.ProspectActivity_Base,rscdc1_mxradon.ProspectActivity_ExtensionBase,rscdc1_mxradon.ProspectActivityProductMap_Base,rscdc1_mxradon.Opportunity_CalculatedFields,rscdc1_mxradon.ActivityNote_Base,rscdc1_mxradon.ProspectNote_Base,rscdc1_mxradon.CampaignActivity_Base,rscdc1_mxradon.AutoresponderAction_Base,rscdc1_mxradon.Attachment_Base,rscdc1_mxradon.WebContent_Base,rscdc1_mxradon.LeadScore_Base,rscdc1_mxradon.UserTask_Base,rscdc1_mxradon.UserTask_ExtensionBase,rscdc1_mxradon.TaskType_Base,rscdc1_mxradon.UserRecurringTaskMapping_Base,rscdc1_mxradon.Company_Base,rscdc1_mxradon.Entity_Base,rscdc1_mxradon.ProspectMailingPreference_Base,rscdc1_mxradon.ProspectQualityScore_ExtensionBase,rscdc1_mxradon.ProspectSocialIdentifier_ExtensionBase,rscdc1_mxradon.EntityMap_Base,rscdc1_mxradon.EntityConversionPropensity_Base,rscdc1_mxradon.Setting_Base,rscdc1_mxradon.CustomObjectProspect_Base,rscdc1_mxradon.CustomObjectProspectActivity_Base,rscdc1_mxradon.EntityAttribute_Base,rscdc1_mxradon.CampaignActivityRecord_Base,rscdc1_mxradon.ProspectAssignHistory_Base,rscdc1_mxradon.ProspectAudit_Base,rscdc1_mxradon.debezium_signals", "snapshot.mode": "when_needed", "snapshot.new.tables": "parallel", "topic.prefix": "aurora_cluster_210009", "transforms": "routeProspects,routeProspectActivities,routeOthers", "transforms.routeProspects.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeProspects.regex": "aurora_cluster_210009\\.rscdc1_mxradon\\.(Prospect_Base|Prospect_ExtensionBase|ProspectSystem_ExtensionBase|ProspectMailingPreference_Base|ProspectQualityScore_ExtensionBase|ProspectSocialIdentifier_ExtensionBase|ProspectConversionPropensity_Base|CustomObjectProspect_Base)", "transforms.routeProspects.replacement": "aurora_cluster_210009.prospects", "transforms.routeProspectActivities.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeProspectActivities.regex": "aurora_cluster_210009\\.rscdc1_mxradon\\.(ProspectActivity_Base|ProspectActivity_ExtensionBase|ProspectActivityProductMap_Base|EntityConversionPropensity_Base|CustomObjectProspectActivity_Base)", "transforms.routeProspectActivities.replacement": "aurora_cluster_210009.prospect_activities", "transforms.routeOthers.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeOthers.regex": "aurora_cluster_210009\\.rscdc1_mxradon\\..*", "transforms.routeOthers.replacement": "aurora_cluster_210009.other_entities", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "dbhistory.aurora.210009.v1", "schema.history.internal.skip.unparseable.ddl": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "errors.tolerance": "all", "errors.log.enable": "true", "errors.log.include.messages": "true", "signal.data.collection": "rscdc1_mxradon.debezium_signals" } }

I now need to update the table list to include an additional table called User_Base.
So i issue a PUT request with the additional table included in my table list and i set the "snapshot.mode": "schema_only".
I also put an entry to the signals table

INSERT INTO rscdc1_mxradon.debezium_signals (id,type, data) VALUES ( CONCAT('user-base-snapshot-', UNIX_TIMESTAMP()), 'execute-snapshot', '{ "data-collections": ["rscdc1_mxradon.User_Base"] }' );

However the moment a write happens to my user table the connector throws an error

Error processing binlog event\n\t... 7 more\nCaused by: io.debezium.DebeziumException: Encountered change event for table rscdc1_mxradon.User_Base whose schema isn't known to this connector\n\tat io.debezium.connector.mysql.MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired


How do i handle this scenario. i have found a workaround with the below listed steps .
  • Create connector
  • Ensure it is running
  • Stop connector
  • Delete the history topic
  • Update Config to include new table and set snapshot to schema_only_recovery
  • Resume connector
  • Send Signal to Signal table for incremental snapshot
  •   Restore the snapshot setting to when_needed
This however involves stopping the connector and clearing the history topic.
Is there a cleaner or a more dynamic way to handle this scenario ?


Activity
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/1005bf1d-49f6-424e-a673-afb16c8a2d85n%40googlegroups.com.

Message has been deleted

Chris Cranford

unread,
Aug 29, 2025, 7:31:58 AM (13 days ago) Aug 29
to debe...@googlegroups.com
Hi, allow Debezium to capture schemas for all tables rather than limiting it using `schema.history.internal.store.only.captured.tables.ddl`. You can safely add the table to the include list and restart the connector. 

On Fri, Aug 29, 2025 at 1:24 AM Rohit Panwar <dailylea...@gmail.com> wrote:
Hi Chris,

As i have mentioned, i used schema recovery option. I have mentioned it in the section where i stated what worked for me. Reiterating it again

    • Create connector
    • Ensure it is running
    • Stop connector
    • Delete the history topic
    • Update Config to include new table and set snapshot to schema_only_recovery
    • Resume connector
    • Send Signal to Signal table for incremental snapshot
    •   Restore the snapshot setting to when_needed

    • The above approach worked, but i am looking for a cleaner solution without having to stop the connector or deleting the history topic

    Rohit Panwar

    unread,
    Aug 30, 2025, 1:43:06 AM (12 days ago) Aug 30
    to debezium
    Hi Chris,

    We have a lot of tables , close to 300, however we are interested in the change events of around 50 tables, as a result storing the schemas for all of them is only going to make the topic bloated. Also new tables keep getting added to the database which may have to be added to the table list. Even if i set the `schema.history.internal.store.only.captured.tables.ddl` as false, it will still not account for tables that get added to my database later post the connector initialization. So ideally i would only want to capture the tables in my table list and also avoid stopping/starting the connector to add a new one.

    Chris Cranford

    unread,
    Sep 2, 2025, 3:16:12 PM (9 days ago) Sep 2
    to debe...@googlegroups.com
    Hi -

    First, capturing 16% of all your tables can undoubtedly lead to schema history topic bloat. Still, on the other hand, it can simplify iterative changes to the include list in the future.

    When `schema.history.internal.store.only.captured.tables.ddl` is set to `false` (the default), if a user creates a new table, that table will be added to the history topic, even if the connector is not capturing changes for that table. In the future, whenever you decide to capture changes for that table, it's as simple as modifying the `table.include.list` and using incremental or blocking snapshots to send historical data, if any exists. There's no more complex workflow you'll need to undergo, since the connector has managed the lifecycle of the table's structure since its creation. 

    It's when you begin to restrict what is populated in the schema history topic by setting the topic only to store captured table schema changes that you find yourself in a situation where adding new tables becomes a complex workflow to try and deal with schema representations of tables at varying points in time, rather than the schema history topic always being consistent due to a single connector tracking schema changes. 

    -cc

    Reply all
    Reply to author
    Forward
    0 new messages