Oracle source connector for incremental snapshot signals

126 views
Skip to first unread message

Vladislav P

unread,
Oct 13, 2025, 3:56:01 PMOct 13
to debezium
Hello!
I am transferring data from Oracle to Postgresql.
Before solving the problem, I would like to make sure that "incremental snapshot" allows you to run a full read of data from the Oracle table after <snapshot.mode: "no_data"> has worked?

After I launch the connector with <snapshot.mode: "no_data">, the new data that goes into the tables is placed in the corresponding kafka topic. But the initial data that was in the tables is not transferred. If you use <snapshot.mode: "initial">, then everything works fine, but then what are "incremental snapshots" for?

A signal table was created for incremental snapshots:

CREATE TABLE SCHEMA.debezium_signal (
        id VARCHAR(42) PRIMARY KEY,
        type VARCHAR(32) NOT NULL,
        data VARCHAR(2048) NULL
    );

Rights granted:
GRANT SELECT, INSERT, UPDATE ON SCHEMA.debezium_signal TO USER;

ALTER TABLE SCHEMA.debezium_signal ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Config source-connector:
{
"name": "source-connector-incremental-snapshot",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "{{databaseHost}}",
"database.port": "{{databasePort}}",
"database.user": "{{databaseUser}}",
"database.password": "{{databasePassword}}",
"database.dbname": "{{databaseName}}",
"table.include.list": "SCHEMA\\.(TABLE1|TABLE2)",
"column.include.list": "....",
"topic.prefix": "{{topicPrefix}}",
"database.server.name": "{{topicPrefix}}",
"schema.history.internal.kafka.topic": "dbz_oracle_wpms_history",
"schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
"log.mining.strategy": "redo_log_catalog",

"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "{{apicurioRegistryUrl}}",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"{{apicurioRegistryUrl}}",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"schema.name.adjustment.mode": "avro",

"signal.enabled.channels": "source,kafka",
"signal.kafka.topic": "signals",
"signal.data.collection": "{{topicPrefix}}.{{SCHEMA}}.DEBEZIUM_SIGNAL",
"signal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
"incremental.snapshot.chunk.size": 50000,

"topic.creation.enable": "true",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.compression.type": "lz4",

"snapshot.mode": "no_data",

"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "MERGE INTO {{SCHEMA}}.DEBEZIUM_HEARTBEAT t USING (SELECT 1 id, CURRENT_TIMESTAMP ts FROM dual) s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET t.ts = s.ts WHEN NOT MATCHED THEN INSERT (id, ts) VALUES (s.id, s.ts)"
}
} After sending the message topic to kafka : { "type": "execute-snapshot", "data": { "data-collections": [ "DATABASE.SCHEMA.TABLE1" ] } }   I see successful messages in the logs:
2025-10-13 17:41:37,361 INFO Oracle|dbserver|streaming Incremental snapshot for table 'DATABASE.SCHEMA.TABLE1' will end at position [21] [io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource]   

2025-10-13 17:42:51,678 INFO || 17 records sent during previous 00:01:21.63, last recorded offset of {server=dbserver} partition is {commit_scn=6241104667608:1:0a001300e2d73501, incremental_snapshot_correlation_id=null, incremental_snapshot_maximum_key=aced...., snapshot_scn=6241104637140, incremental_snapshot_collections=[{"incremental_snapshot_collections_id":" DATABASE.SCHEMA.TABLE1","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":"ID"}], incremental_snapshot_primary_key=aced000570, scn=6241104667607} [io.debezium.connector.common.BaseSourceTask]
2025-10-13 17:43:15,255 INFO || WorkerSourceTask{id=source-connector-incremental-snapshot-0} Committing offsets for 11 acknowledged messages [org.apache.kafka.connect.runtime.WorkerSourceTask] .... But nothing happens after these messages. Entries "snapshot-window-open" and " snapshot-window-close " appear in the "DEBEZIUM_SIGNAL" table, but no entries appear in the corresponding topic "dbserver.SCHEMA.TABLE1". What should I do?

Chris Cranford

unread,
Oct 13, 2025, 4:11:53 PMOct 13
to debezium
Hi -

An incremental snapshot is simply a way to trigger an on-demand snapshot of a table or tables. 

With an Oracle source, you may be unable to perform an initial snapshot due to the database's undo_retention configuration. When the undo_retention is too small relative to the duration needed for the initial snapshot, the snapshot will fail typically with an ORA-01555. A way to workaround this Oracle limitation is to set `snapshot.mode` as `no_data`, and then use incremental snapshots to replicate the historical data in chunks while streaming changes from your transaction logs. This does not rely on flashback queries, and therefore the ORA-01555 error isn't an issue.

Other reasons for incremental snapshots may include recently adding a new table to your `table.include.list`or you've recently manually modified the connector offsets and you need to guarantee that all data in your table-specific topics are consistent. 

As for the configuration, if I may make a few suggestions, just for performance purposes.

    1. Set `log.mining.strategy` either to `online_catalog` or `hybrid`.
    2. Change `table.include.list` to explicitly list each table without regular expressions, e.g. `SCHEMA.TABLE1,SCHEMA.TABLE2` which helps with suggestion #3
    3. Set `log.mining.query.filter.mode` to `in`.

I suggest these modifications in case the issue is that the connector is blocked waiting on LogMiner to extract & load the data dictionary. By using `online_catalog` that's unnecessary and the connector will operate faster.

Do you notice any changes with those suggestions?

-cc

Vladislav P

unread,
Oct 13, 2025, 5:39:36 PMOct 13
to debezium
Thanks for the advice.
I edited 'table.include.list' to remove the regular expressions, but left the following regular expressions in 'column.include.list':
"SCHEMA\\.TABLE1\\.(COLUMN1|COLUMN2), SCHEMA\\.TABLE2\\.(COLUMN1|COLUMN2)".

I also changed:
"log.mining.strategy": "hybrid",
"log.mining.query.filter.mode": "in"

But I didn't see any changes in the operation; historical data still isn't included in the topic, only streaming data. After these changes, two entries appeared in the DEBEZIUM_SIGNAL table: "snapshot-window-open" and "snapshot-window-close".

I also noticed that a table is being created in the LOG_MINING_FLUSH (LAST_SCN) schema. Is this normal?

I've attached the log file starting from step 7.

I'm still not sure if the fields are specified correctly:
"topic.prefix": "dbserver",
"database.server.name": "dbserver",
"signal.data.collection": "dbserver.SCHEMA.DEBEZIUM_SIGNAL".
But since the topic is created and the entries in the signal table appear correctly, everything is correct.

For debezium-connect, I use the image /debezium/connect:3.0.0.Final.
kafka-ui.jpg
вторник, 14 октября 2025 г. в 00:11:53 UTC+4, Chris Cranford:
logs.txt

Vladislav P

unread,
Oct 14, 2025, 3:13:52 AMOct 14
to debezium
Any ideas what this might be connected with?

вторник, 14 октября 2025 г. в 01:39:36 UTC+4, Vladislav P:

Vladislav P

unread,
Oct 17, 2025, 8:00:27 AMOct 17
to debezium
Can someone describe a basic example describing creating tables, granting permissions, describing the source connector for Oracle, and executing commands to get an incremental snapshot to read historical data? You can use abstract names.

Or tell me how to debug to detect the problem. "For some reason, historical data is not being read via incremental snapshot, but new data is being read".

вторник, 14 октября 2025 г. в 11:13:52 UTC+4, Vladislav P:

Chris Cranford

unread,
Oct 19, 2025, 7:11:42 AMOct 19
to debe...@googlegroups.com
Hi, please attach the complete log from connector startup, up to and including your incremental snapshot. If you can enable DEBUG logging for `io.debezium`, that would be great.

Thanks,
-cc
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/ffa16d25-0655-44c1-bfe8-7bc2daad2792n%40googlegroups.com.

Vladislav P

unread,
Oct 29, 2025, 1:01:56 PMOct 29
to debezium
Hi. I would like to resume the discussion of incremental snapshots.

Because I come across ORA-01555 at some booths.
After I run the configuration with the 'snapshot.mode: no_data' parameter, new data is sent to the partitions, but the old data is not read when using the snapshot.
Actions I did:

Step 1 (creating a signal table in Oracle):
CREATE TABLE  SCHEMA1.DEBEZIUM_SIGNAL (

   id VARCHAR(42) PRIMARY KEY,
   type VARCHAR(32) NOT NULL,
   data VARCHAR(2048) NULL
);
GRANT SELECT, INSERT, UPDATE ON  SCHEMA1.DEBEZIUM_SIGNAL TO WPMS;
ALTER TABLE  SCHEMA1.DEBEZIUM_SIGNAL ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Step 2 (description of the source connector):
```
POST http://{{host}}:{{port}}/connectors/
Content-Type: application/json

{
  "name": "wpms-source-connector-avro-snapshot-incrementa",

  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.hostname": "{{sourceDatabaseHost}}",
    "database.port": "{{sourceDatabasePort}}",
    "database.user": "{{sourceDatabaseUser}}",
    "database.password": "{{sourceDatabasePassword}}",
    "database.dbname": "{{sourceDatabaseName}}",
    "table.include.list": " SCHEMA2.DIVISION",
    "column.include.list": " SCHEMA2\\.DIVISION\\.(ID_DIV|NAME)",

    "topic.prefix": "{{topicPrefix}}",
    "database.server.name": "{{topicPrefix}}",
    "schema.history.internal.kafka.topic": "dbz_oracle_wpms_history_incremental_snapshot3",
    "schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
    "log.mining.strategy": "hybrid",
    "log.mining.query.filter.mode": "in",

    "message.key.columns": "SCHEMA2.DIVISION:ID_DIV;",


    "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "key.converter.apicurio.registry.url": "{{apicurioRegistryUrl}}",
    "key.converter.apicurio.registry.auto-register": "true",
    "key.converter.apicurio.registry.find-latest": "true",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url":"{{apicurioRegistryUrl}}",
    "value.converter.apicurio.registry.auto-register": "true",
    "value.converter.apicurio.registry.find-latest": "true",
    "schema.name.adjustment.mode": "avro",

    "snapshot.mode": "no_data",
    "signal.enable.channels": "source",
    "signal.data.collection": "{{topicPrefix}}.SCHEMA1.DEBEZIUM_SIGNAL",

    "incremental.snapshot.chunk.size": 50000,
    "incremental.snapshot.allow.schema.changes": "true",

    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "topic.creation.default.cleanup.policy": "delete",
  }
}
```
Step4 (enable debugging mode for logging):
```
PUT http://{{host}}:{{port}}/admin/loggers/io.debezium.connector.oracle
Content-Type: application/json

{
"level": "DEBUG"
}

PUT http://{{host}}:{{port}}/admin/loggers/io.debezium.pipeline.signal
Content-Type: application/json

{
"level": "DEBUG"
}
```
But I still haven't noticed any changes in logging.

Step3 (launching the source connector):
There are no errors after the launch, the introspection stage ended successfully.

Step4 (Inserting an entry into the DEBEZIUM_SIGNAL table):
```
INSERT INTO SCHEMA1.DEBEZIUM_SIGNAL (id, type, data)
values(
'ad-hoc-1',
'execute-snapshot',
'{"data-collections": ["SCHEMA2.DIVISION"], "type": "INCREMENTAL"}'
);
```
After inserting an entry in the subject (dbserver.SCHEMA1.DEBEZIUM_SIGNAL), a message appears. But it feels like no one is reading it. The logs just stop there, and nothing happens.
Logs after INSERT: ```
2025-10-29 14:56:55,173 INFO   ||  1 records sent during previous 00:01:06.132, last recorded offset of {server=dbserver} partition is {commit_scn=6308159361341:1:2a000d00c05d1a01, snapshot_scn=6308159360834, scn=6308159361340}   [io.debezium.connector.common.BaseSourceTask]
2025-10-29 14:56:55,246 INFO   ||  The task will send records to topic 'dbserver.SCHEMA1.DEBEZIUM_SIGNAL' for the first time. Checking whether topic exists   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
2025-10-29 14:56:55,248 INFO   ||  Creating topic 'dbserver.SCHEMA1.DEBEZIUM_SIGNAL'   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
2025-10-29 14:56:55,288 INFO   ||  Created topic (name=dbserver.SCHEMA1.DEBEZIUM_SIGNAL, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete}) on brokers at apache-kafka:19092   [org.apache.kafka.connect.util.TopicAdmin]
2025-10-29 14:56:55,293 INFO   ||  Created topic '(name=dbserver.SCHEMA1.DEBEZIUM_SIGNAL, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=1, replicationFactor=1, otherConfigs={cleanup.policy=delete}}   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
2025-10-29 14:57:33,678 INFO   ||  WorkerSourceTask{id=source-connector-avro-snapshot-incremental3-0} Committing offsets for 1 acknowledged messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2025-10-29 15:00:33,785 INFO   ||  [AdminClient clientId=connector-adminclient-source-connector-avro-snapshot-incremental3-0] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]
2025-10-29 15:00:53,709 INFO   ||  [AdminClient clientId=1-shared-admin] Node 1 disconnected.   [org.apache.kafka.clients.NetworkClient]
2025-10-29 15:04:33,921 INFO   ||  [Producer clientId=connector-producer-source-connector-avro-snapshot-incremental3-0] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]
```

воскресенье, 19 октября 2025 г. в 15:11:42 UTC+4, Chris Cranford:

Chris Cranford

unread,
Oct 30, 2025, 12:50:00 PM (14 days ago) Oct 30
to debe...@googlegroups.com
Hi,

It's likely because the `signal.data.collection` should be `<database>.<schema>.<table>`, and based on your configuration that would mean:

    {{sourceDatabaseName}}.SCHEMA1.DEBEZIUM_SIGNAL

Give that a try and let us know if that fixes the issue.

Thanks,
-cc

Vladislav P

unread,
Oct 31, 2025, 9:46:46 AM (13 days ago) Oct 31
to debezium
Hi, I fixed the connector like this, and it worked:

{
  "name": "wpms-source-connector-avro-snapshot-incrementa",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.hostname": "{{sourceDatabaseHost}}",
    "database.port": "{{sourceDatabasePort}}",
    "database.user": "{{sourceDatabaseUser}}",
    "database.password": "{{sourceDatabasePassword}}",
    "database.dbname": "{{sourceDatabaseName}}",
    "table.include.list": "SCHEMA1.DEBEZIUM_SIGNAL,SCHEMA2.DIVISION",
    "column.include.list": "  SCHEMA1\\.DEBEZIUM_SIGNAL\\.(ID|TYPE|DATA), SCHEMA2\\.DIVISION\\.(ID_DIV|NAME)",

    "topic.prefix": "{{topicPrefix}}",
    "database.server.name": "{{topicPrefix}}",
    "schema.history.internal.kafka.topic": "dbz_oracle_wpms_history_incremental_snapshot3",
    "schema.history.internal.kafka.bootstrap.servers": "{{kafkaBootstrapServers}}",
    "log.mining.strategy": "hybrid",
    "log.mining.query.filter.mode": "in",

    "message.key.columns": "SCHEMA1.DIBEZIUM_SIGNAL:ID;SCHEMA2.DIVISION:ID_DIV;",

    "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "key.converter.apicurio.registry.url": "{{apicurioRegistryUrl}}",
    "key.converter.apicurio.registry.auto-register": "true",
    "key.converter.apicurio.registry.find-latest": "true",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url":"{{apicurioRegistryUrl}}",
    "value.converter.apicurio.registry.auto-register": "true",
    "value.converter.apicurio.registry.find-latest": "true",
    "schema.name.adjustment.mode": "avro",

    "snapshot.mode": "no_data",
    "signal.enable.channels": "source",
    "signal.data.collection": "{{sourceDatabaseName}}.SCHEMA1.DEBEZIUM_SIGNAL",

    "incremental.snapshot.chunk.size": 50000,
    "incremental.snapshot.allow.schema.changes": "true",
    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "topic.creation.default.cleanup.policy": "delete",
  }
}



And sent such a signal:

INSERT INTO SCHEMA1.DEBEZIUM_SIGNAL (id, type, data)
values(
'ad-hoc-1',
'execute-snapshot',
'{"data-collections": ["{{sourceDatabaseName}}.SCHEMA2.DIVISION"], "type": "INCREMENTAL"}'
);

четверг, 30 октября 2025 г. в 20:50:00 UTC+4, Chris Cranford:
Reply all
Reply to author
Forward
0 new messages