Managing Schema Changes in Debezium with Dockerized Debezium Server

394 views
Skip to first unread message

Joaquín N. García

unread,
Sep 20, 2023, 5:05:01 PM9/20/23
to debezium
Hello everyone,

could you please help me with some questions I have regarding Debezium Server running on Docker? I already have it working from SQL to GCP, and I would like to know how to add a new table (Is there a way to do it without having to restart the container?) and also how to handle changes in a table (such as adding a new column or deleting one).

I would greatly appreciate your assistance with these questions.

Chris Cranford

unread,
Sep 21, 2023, 7:07:13 AM9/21/23
to debe...@googlegroups.com
Hi Joaquin,

With regard to the additional table, this requires a configuration change and therefore a restart of Debezium Server.  Unfortunately, you cannot modify the configuration on the fly without a connector restart.  This works in similar ways in Kafka Connect where a configuration change will force the task to be stopped and re-created with the new configuration after it has been updated.

Finally, when adding a new column to the table, schema evolution differs slightly per connector.  Could you be more specific about which SQL connector you're using?

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/bcf567fe-7824-40b8-89f0-df9be84c8674n%40googlegroups.com.

Joaquín N. García

unread,
Sep 21, 2023, 12:01:48 PM9/21/23
to debezium
Thank you  Chris. I understand that this is the way to add a new table in the database.

Regarding the other point, the connector we are using is MS SQL "debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector."

Regards!

Chris Cranford

unread,
Sep 21, 2023, 2:08:27 PM9/21/23
to debe...@googlegroups.com
Hi Joaquin,

For SQL Server, there are two ways to deal with schema evolution called online and offline.  You can find the procedure for this in the documentation [1].

Thanks,
Chris

[1]: https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#sqlserver-schema-evolution

Joaquín N. García

unread,
Sep 22, 2023, 6:16:35 PM9/22/23
to debezium
Hi Chris,

Sorry again, I am trying to add a new table (table2), but it skips the initial snapshot of the new table (table2). I suppose it is because I already created a snapshot with the first table (table1) and an offset was created.

Is there a way to make it only take a snapshot of each new table that is added?

thank you and regards

jiri.p...@gmail.com

unread,
Sep 25, 2023, 12:39:09 AM9/25/23
to debezium

Joaquín N. García

unread,
Sep 27, 2023, 2:46:26 PM9/27/23
to debezium
Thank you Jiri, 

I'm trying, but I'm getting an error. Could you please provide assistance? I'm attaching the log, and this is what we're doing in SQL. 

insert into [Debezium].[dbo].[debezium_signal]
select 'test', 'execute-snapshot', '{"data-collections": ["dbo.TABLE"]}'

We also tried this configuration: 

debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=${TF_VAR_GOOGLE_PROJECT_ID}
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.database.hostname=${SERVER_IP}
debezium.source.database.port=${SERVER_PORT}
debezium.source.database.user=${SERVER_USER}
debezium.source.database.password=${SERVER_PASSWORD}
debezium.source.database.names=${DATABASE_NAME}
debezium.source.database.encrypt=false
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=/tmp/database_history.dat
debezium.source.decimal.handling.mode=double
debezium.source.signal.data.collection=Debezium.dbo.debezium_signal
debezium.source.table.include.list=${INCLUDE_LIST}
debezium.source.topic.prefix=${TOPIC_PREFIX}
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=/tmp/schemahistory.dat
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.storage.partitions=5
debezium.source.tombstones.on.delete=false
debezium.source.transforms=unwrap
debezium.source.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.source.transforms.unwrap.add.fields=op,table,source.ts_ms
debezium.source.transforms.unwrap.delete.handling.mode=rewrite
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.source.snapshot.isolation.mode=snapshot

{"timestamp":"2023-09-27T12:25:13.818-06:00","sequence":283,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.signal.SignalProcessor","level":"WARN","message":"Action execute-snapshot failed. The signal SignalRecord{id='test', type='execute-snapshot', data='{\"data-collections\": [\"dbo.TABLE\"]}', additionalData={}} may not have been processed.","threadName":"debezium-sqlserverconnector-SERVER-change-event-source-coordinator","threadId":32,"mdc":{"dbz.taskId":"0","dbz.databaseName":"Debezium","dbz.connectorName":"CONNECTOR_NAME","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"82a98012a86c","processName":"io.debezium.server.Main","processId":2580,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"The task must be configured to use exactly one partition, 3 found","frames":[{"class":"io.debezium.pipeline.spi.Offsets","method":"getTheOnlyPartition","line":63},{"class":"io.debezium.pipeline.signal.SignalProcessor","method":"processSignal","line":183},{"class":"java.util.stream.ForEachOps$ForEachOp$OfRef","method":"accept","line":183},{"class":"java.util.AbstractList$RandomAccessSpliterator","method":"forEachRemaining","line":720},{"class":"java.util.stream.ReferencePipeline$Head","method":"forEach","line":658},{"class":"java.util.stream.ReferencePipeline$7$1","method":"accept","line":274},{"class":"java.util.stream.ReferencePipeline$3$1","method":"accept","line":195},{"class":"java.util.stream.ReferencePipeline$2$1","method":"accept","line":177},{"class":"java.util.stream.ReferencePipeline$2$1","method":"accept","line":177},{"class":"java.util.ArrayList$ArrayListSpliterator","method":"forEachRemaining","line":1655},{"class":"java.util.stream.AbstractPipeline","method":"copyInto","line":484},{"class":"java.util.stream.AbstractPipeline","method":"wrapAndCopyInto","line":474},{"class":"java.util.stream.ForEachOps$ForEachOp","method":"evaluateSequential","line":150},{"class":"java.util.stream.ForEachOps$ForEachOp$OfRef","method":"evaluateSequential","line":173},{"class":"java.util.stream.AbstractPipeline","method":"evaluate","line":234},{"class":"java.util.stream.ReferencePipeline","method":"forEach","line":497},{"class":"io.debezium.pipeline.signal.SignalProcessor","method":"lambda$processSourceSignal$4","line":147},{"class":"io.debezium.pipeline.signal.SignalProcessor","method":"executeWithSemaphore","line":157},{"class":"io.debezium.pipeline.signal.SignalProcessor","method":"processSourceSignal","line":140},{"class":"io.debezium.pipeline.EventDispatcher$2","method":"changeRecord","line":312},{"class":"io.debezium.relational.RelationalChangeRecordEmitter","method":"emitCreateRecord","line":79},{"class":"io.debezium.relational.RelationalChangeRecordEmitter","method":"emitChangeRecords","line":47},{"class":"io.debezium.pipeline.EventDispatcher","method":"dispatchDataChangeEvent","line":296},{"class":"io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource","method":"lambda$executeIteration$1","line":318},{"class":"io.debezium.jdbc.JdbcConnection","method":"prepareQuery","line":591},{"class":"io.debezium.connector.sqlserver.SqlServerConnection","method":"getChangesForTables","line":351},{"class":"io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource","method":"executeIteration","line":219},{"class":"io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource","method":"executeIteration","line":60},{"class":"io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator","method":"executeChangeEventSources","line":121},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":118},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}


thank you and regards

Joaquín N. García

unread,
Sep 29, 2023, 3:40:36 PM9/29/23
to debezium
Hi everyone!

I have already identified the issue. We had two different databases, and after leaving one of them, I was able to send the signal. However, I now have a new problem as it indicates that it cannot find the schema. Could you please assist us with this error that we are encountering?

note: the new table is in the debezium.source.table.include.list

thank you and regards

here is the log:

{"timestamp":"2023-09-29T13:22:23.617-06:00","sequence":152,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource","level":"INFO","message":"Skipping change ChangeTableResultSet{changeTable=Capture instance \"dbo_debezium_signal\" [sourceTableId=DATABASE.dbo.debezium_signal, changeTableId=DATABASE.cdc.dbo_debezium_signal_CT, startLsn=00000149:00015af2:0040, changeTableObjectId=1890105774, stopLsn=NULL], resultSet=SQLServerResultSet:9, completed=false, currentChangePosition=00000149:00016acf:0003(00000149:00016acf:0002)} as its order in the transaction 1 is smaller than or equal to the last recorded operation 00000149:00016acf:0003(00000149:00016acf:0002)[1]","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":32,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"82a98012a86c","processName":"io.debezium.server.Main","processId":2911}
{"timestamp":"2023-09-29T13:26:06.555-06:00","sequence":153,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot","level":"INFO","message":"Requested 'INCREMENTAL' snapshot of data collections '[dbo.NEWTABLE]' with additional condition 'No condition passed' and surrogate key 'PK of table will be used'","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":32,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"82a98012a86c","processName":"io.debezium.server.Main","processId":2911}
{"timestamp":"2023-09-29T13:26:06.567-06:00","sequence":154,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource","level":"WARN","message":"Schema not found for table 'dbo.NEWTABLE', known tables [DATABASE.dbo.TABLE1, DATABASE.dbo.debezium_signal, DATABASE.dbo.TABLE2, DATABASE.dbo.NEWTABLEs]","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":32,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"82a98012a86c","processName":"io.debezium.server.Main","processId":2911}
{"timestamp":"2023-09-29T13:26:06.892-06:00","sequence":155,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"1 records sent during previous 00:03:43.762, last recorded offset of {server=SQLSERVER, database=DATABASE} partition is {transaction_id=null, event_serial_no=1, commit_lsn=00000149:00016b6c:0003, change_lsn=00000149:00016b6c:0002}","threadName":"pool-7-thread-1","threadId":23,"mdc":{},"ndc":"","hostName":"82a98012a86c","processName":"io.debezium.server.Main","processId":2911}
{"timestamp":"2023-09-29T13:26:11.545-06:00","sequence":156,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource","level":"INFO","message":"Skipping read chunk because snapshot is not running","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":32,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"82a98012a86c","processName":"io.debezium.server.Main","processId":2911}

Joaquín N. García

unread,
Oct 3, 2023, 6:24:03 PM10/3/23
to debezium

Hi everyone,

I'm still having the same issue with Debezium. Is there another way for Debezium to add new tables? The reason is that we are doing incremental testing with tables.

Regards!

jiri.p...@gmail.com

unread,
Oct 4, 2023, 2:39:41 AM10/4/23
to debezium
Hi,

from the log, you are trying to snapshot `dbo.NEWTABLE` but there is table `dbo.NEWTABLEs`. Maybe it is a typo?

J.

Joaquín N. García

unread,
Oct 4, 2023, 10:31:01 AM10/4/23
to debezium
Hi Jiri,

Sorry, it was a typo. It's just  dbo.NEWTABLE 

Regards!

jiri.p...@gmail.com

unread,
Oct 5, 2023, 5:45:42 AM10/5/23
to debezium
Typo in the sense of configuration or message?

J.

Joaquín N. García

unread,
Oct 5, 2023, 12:49:38 PM10/5/23
to debezium
Hello Jiri,

There was an error when changing the real names in the log, but I have now updated the Docker image version to TAG 2.4.0.Final (I was previously using 2.3.4.Final), and I no longer encounter any issues in the log. However, I am unable to load the initial snapshot (the table has 5,983,410 records). Here is the log that I am currently seeing:

{"timestamp":"2023-10-05T10:29:43.393-06:00","sequence":149,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource","level":"INFO","message":"CDC is enabled for table Capture instance \"dbo_TABLE1\" [sourceTableId=DATABASE.dbo.TABLE1, changeTableId=DATABASE.cdc.dbo_TABLE1_CT, startLsn=0000014a:00001b40:0003, changeTableObjectId=466100701, stopLsn=NULL] but the table is not on connector's table include list","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}
{"timestamp":"2023-10-05T10:29:43.396-06:00","sequence":156,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource","level":"INFO","message":"Table DATABASE.dbo.NEWTABLE is new to be monitored by capture instance dbo_NEWTABLE","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}
{"timestamp":"2023-10-05T10:29:43.467-06:00","sequence":157,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.relational.history.SchemaHistoryMetrics","level":"INFO","message":"Already applied 3 database changes","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}
{"timestamp":"2023-10-05T10:29:43.51-06:00","sequence":158,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot","level":"INFO","message":"Requested 'INCREMENTAL' snapshot of data collections '[DATABASE.dbo.NEWTABLE]' with additional conditions '[]' and surrogate key 'PK of table will be used'","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}
{"timestamp":"2023-10-05T10:29:43.541-06:00","sequence":159,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource","level":"INFO","message":"Incremental snapshot for table 'DATABASE.dbo.NEWTABLE' will end at position [1583398, 20, 1, 504377]","threadName":"debezium-sqlserverconnector-SQLSERVER-change-event-source-coordinator","threadId":31,"mdc":{"dbz.taskId":"0","dbz.databaseName":"DATABASE","dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"streaming"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}
{"timestamp":"2023-10-05T10:29:43.578-06:00","sequence":160,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"1 records sent during previous 00:02:12.705, last recorded offset of {server=SQLSERVER, database=DATABASE} partition is {transaction_id=null, event_serial_no=0, commit_lsn=0000014a:000110c5:0003, change_lsn=NULL}","threadName":"pool-7-thread-1","threadId":22,"mdc":{"dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}
{"timestamp":"2023-10-05T10:30:08.821-06:00","sequence":161,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"5131 records sent during previous 00:00:25.243, last recorded offset of {server=SQLSERVER, database=DATABASE} partition is {transaction_id=null, incremental_snapshot_correlation_id=2023-10-05 10:29:40.323, event_serial_no=1, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000047372000e6a6176612e6c616e672e4c6f6e673b8be490cc8f23df0200014a000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700000000000182926737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c75657871007e0003000000147371007e0005000000017371007e00050007b239, commit_lsn=0000014a:00011156:0003, change_lsn=0000014a:00011156:0002, incremental_snapshot_collections=[{\"incremental_snapshot_collections_id\":\"DATABASE.dbo.NEWTABLE\",\"incremental_snapshot_collections_additional_condition\":null,\"incremental_snapshot_collections_surrogate_key\":null}], incremental_snapshot_primary_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000047372000e6a6176612e6c616e672e4c6f6e673b8be490cc8f23df0200014a000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000000000000ab737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c75657871007e0003000000147371007e0005000000027371007e00050004a304}","threadName":"pool-7-thread-1","threadId":22,"mdc":{"dbz.connectorName":"SQLSERVER","dbz.connectorType":"SQL_Server","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"f0aa63b53ecf","processName":"io.debezium.server.Main","processId":1877}

Regards!

jiri.p...@gmail.com

unread,
Oct 6, 2023, 12:59:08 AM10/6/23
to debezium
Hi,

if you did initial snapshot before then it will not get recreated. You either need to create the connector under new name or removed existing offsets (https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector)

J.
Reply all
Reply to author
Forward
0 new messages