We are trying to run the connector for Oracle 19c using snapshots.
Tried running it without snapshots at all in "snapshot.mode": "schema_only". The connector starts without errors, listens to logs, sends creation records correctly.
When running in "snapshot.mode": "initial" mode, it does not take into account the parameter with the snapshot limitation:
"snapshot.select.statement.overrides.mdm6.client": "SELECT * FROM schema.table WHERE persistdate > TO_TIMESTAMP('07.03.2023','DD.MM.YYYYY') AND VERSION = 0".and starts copying the whole table with only one constraint to the last scn.
INFO For table 'DB.SCHEMA.TABLE' using select statement: 'SELECT "DTYPE", "LOCALSYSTEMID", "VERSION", "APPLICATIONNAME", "PERSISTDATE" [...] FROM "SCHEMA". "TABLE" AS OF SCN 310072308644114'.When running the connector with the signal table in the database and in schema_only mode, it crashes with a NullPointer error, and I can't figure out from the previous log entries why it don't run with these settings.
Last messages from logs:
...
Sep 18, 2023 @ 16:00:10.164,"[2023-09-18 16:00:10,164] INFO Snapshot step 5 - Reading structure of captured tables (io.debezium.relational.RelationalSnapshotChangeEventSource:135)"
Sep 18, 2023 @ 16:00:10.164,"[2023-09-18 16:00:10,164] INFO Only captured tables schema should be captured, capturing: [DB.SCHEMA.HISTORY] (io.debezium.connector.oracle.OracleSnapshotChangeEventSource:160)"
Sep 18, 2023 @ 16:00:10.497,"[2023-09-18 16:00:10,497] INFO Capturing structure of table DB.SCHEMA.HISTORY (io.debezium.relational.RelationalSnapshotChangeEventSource:344)"
Sep 18, 2023 @ 16:00:10.497,"[2023-09-18 16:00:10,497] INFO Snapshot step 6 - Persisting schema history (io.debezium.relational.RelationalSnapshotChangeEventSource:144)"
Sep 18, 2023 @ 16:00:11.685,"[2023-09-18 16:00:11,685] INFO Already applied 1 database changes (io.debezium.relational.history.SchemaHistoryMetrics:140)"
Sep 18, 2023 @ 16:00:11.685,"[2023-09-18 16:00:11,685] INFO Capturing structure of table DB.USER.SIGNAL_SNAPSHOT (io.debezium.relational.RelationalSnapshotChangeEventSource:344)"
Sep 18, 2023 @ 16:00:11.686,"[2023-09-18 16:00:11,686] WARN Snapshot was not completed successfully, it will be re-executed upon connector restart (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:97)"
Sep 18, 2023 @ 16:00:11.686,"[2023-09-18 16:00:11,686] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:88)"
Sep 18, 2023 @ 16:00:11.686,"[2023-09-18 16:00:11,686] INFO Connected metrics set to 'false' (io.debezium.pipeline.ChangeEventSourceCoordinator:240)"
Sep 18, 2023 @ 16:00:11.823,"[2023-09-18 16:00:11,823] INFO WorkerSourceTask{id=reindexer_client_1809_3-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)"
Sep 18, 2023 @ 16:00:11.823,"[2023-09-18 16:00:11,823] INFO WorkerSourceTask{id=reindexer_client_1809_3-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)"
Sep 18, 2023 @ 16:00:11.824,"[2023-09-18 16:00:11,824] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:278)"
Sep 18, 2023 @ 16:00:11.835,"[2023-09-18 16:00:11,835] INFO Connection gracefully closed (io.debezium.jdbc.JdbcConnection:947)"
Sep 18, 2023 @ 16:00:11.835,"[2023-09-18 16:00:11,835] INFO [Producer clientId=DB-schemahistory] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)"
Sep 18, 2023 @ 16:00:11.836,"[2023-09-18 16:00:11,836] INFO [Producer clientId=connector-producer-reindexer_client_1809_3-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1183)"
In status of connector:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
5 more
Caused by: java.lang.NullPointerException
at io.debezium.connector.oracle.OracleSnapshotChangeEventSource.getCreateTableEvent(OracleSnapshotChangeEventSource.java:210)
at io.debezium.relational.RelationalSnapshotChangeEventSource.createSchemaChangeEventsForTables(RelationalSnapshotChangeEventSource.java:353)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:146)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
8 moreMaybe it can help for understanding - connector with signal table:
{
"name":"reindexer_client_1409_1",
"config":{
"connector.class":"io.debezium.connector.oracle.OracleConnector",
"tasks.max":"1",
"database.server.name":"DB",
"database.url":"jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.10.10)(PORT = 1521)))(CONNECT_DATA = (SERVER = DEDICATED)(SERVICE_NAME = DB)))",
"database.port":"1521",
"database.user":"user",
"database.password":"password",
"database.dbname": "DB",
"database.connection.adapter": "logminer",
"log.mining.strategy": "online_catalog",
"schema.history.internal.kafka.bootstrap.servers":"10.10.10.31:9092,10.10.10.32:9092,10.10.10.33:9092",
"schema.history.internal.kafka.topic":"DB.SCHEMA.HISTORY",
"schema.history.internal.store.only.captured.tables.ddl": "true",
"schema.history.internal.store.only.captured.databases.ddl": "true",
"schema.include.list":"SCHEMA",
"table.include.list":"SCHEMA.TABLE",
"topic.prefix": "DB",
"topic.creation.default.replication.factor":"3",
"topic.creation.default.partitions":"3",
"snapshot.mode":"initial",
"snapshot.locking.mode":"none",
"snapshot.select.statement.overrides":"SCHEMA.TABLE",
"snapshot.select.statement.overrides.schema.table":"SELECT * FROM SCHEMA.TABLE WHERE persistdate > TO_TIMESTAMP('07.03.2023','DD.MM.YYYY') AND VERSION = 0",
"signal.data.collection":"DB.USER.SIGNAL_SNAPSHOT",
"signal.enabled.channels":"source",
"skipped.operations":"t",
"unavailable.value.placeholder":"__debezium_unavailable_value",
"publication.autocreate.mode":"filtered",
"decimal.handling.mode": "string",
"include.schema.changes": "false",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.add.fields":"op,table",
"internal.log.mining.read.only":"true"
}
}