Hello,
Thank you for the response! I’m currently using Debezium version 3.0, and here is a summary of my connector configuration:
Database: Oracle 19c
Connector Properties:
snapshot.mode: no_data
schema.history.internal.store.only.captured.tables.ddl: "true"
log.mining.strategy: hybrid
log.mining.query.filter.mode: in
log.mining.batch.size.min: "1000"
log.mining.batch.size.max: "500000"
log.mining.batch.size.default: "1000"
connector.class: io.debezium.connector.oracle.OracleConnector
database.dbname: ....
database.pdb.name: ....
heartbeat.action.query: "INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')"
database.password: ....
database.url: ....
database.user: ....
decimal.handling.mode: precise
field.name.adjustment.mode: avro
heartbeat.interval.ms: "5000"
kafka.consumer.offset.commit.enabled: "true"
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source: USER_INFO
key.converter.schema.registry.basic.auth.user.info: ${file:/mnt/secrets/connect-mds-client/bearer.txt:username}:${file:/mnt/secrets/connect-mds-client/bearer.txt:password}
key.converter.schema.registry.ssl.truststore.location: /mnt/sslcerts/truststore.p12
key.converter.schema.registry.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
key.converter.schema.registry.url:
https://schemaregistry.{{ $dot.Values.namespace }}.svc.cluster.local:8081
key.converter.schemas.enable: "true"
lob.enabled: "false"
notification.enabled.channel: sink
notification.sink.topic.name: {{ $value.topicprefix }}.notificaton
schema.history.internal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule
required username="${file:/mnt/secrets/credential/plain.txt:username}" password="${file:/mnt/secrets/credential/plain.txt:password}";
schema.history.internal.consumer.sasl.mechanism: PLAIN
schema.history.internal.consumer.security.protocol: SASL_SSL
schema.history.internal.consumer.ssl.endpoint.identification.algorithm: https
schema.history.internal.consumer.ssl.key.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
schema.history.internal.consumer.ssl.keystore.location: /mnt/sslcerts/keystore.p12
schema.history.internal.consumer.ssl.keystore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
schema.history.internal.consumer.ssl.truststore.location: /mnt/sslcerts/truststore.p12
schema.history.internal.consumer.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
schema.history.internal.kafka.bootstrap.servers: kafka.{{ $dot.Values.namespace }}.svc.cluster.local:9071
schema.history.internal.kafka.topic: {{ $value.topicprefix }}.schemahistory
schema.history.internal.producer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule
required username="${file:/mnt/secrets/credential/plain.txt:username}" password="${file:/mnt/secrets/credential/plain.txt:password}";
schema.history.internal.producer.sasl.mechanism: PLAIN
schema.history.internal.producer.security.protocol: SASL_SSL
schema.history.internal.producer.ssl.endpoint.identification.algorithm: https
schema.history.internal.producer.ssl.key.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
schema.history.internal.producer.ssl.keystore.location: /mnt/sslcerts/keystore.p12
schema.history.internal.producer.ssl.keystore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
schema.history.internal.producer.ssl.truststore.location: /mnt/sslcerts/truststore.p12
schema.history.internal.producer.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
schema.name.adjustment.mode: avro
signal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule
required username="${file:/mnt/secrets/credential/plain.txt:username}" password="${file:/mnt/secrets/credential/plain.txt:password}";
signal.consumer.sasl.mechanism: PLAIN
signal.consumer.security.protocol: SASL_SSL
signal.consumer.ssl.endpoint.identification.algorithm: https
signal.consumer.ssl.key.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
signal.consumer.ssl.keystore.location: /mnt/sslcerts/keystore.p12
signal.consumer.ssl.keystore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
signal.consumer.ssl.truststore.location: /mnt/sslcerts/truststore.p12
signal.consumer.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
signal.enabled.channels: kafka
signal.kafka.bootstrap.servers: kafka.{{ $dot.Values.namespace }}.svc.cluster.local:9071
signal.kafka.groupId: kafka-signal
signal.kafka.poll.timeout.ms: "100"
signal.kafka.topic: {{ $value.topicprefix }}.signal
table.include.list: ${file:/mnt/secrets/{{ $value.topicprefix | trim | kebabcase }}-cred/db-credentials.txt:table.include.list}
tasks.max: "1"
topic.creation.default.cleanup.policy: delete
topic.creation.default.compression.type: lz4
topic.creation.default.partitions: "3"
topic.creation.default.replication.factor: "1"
topic.prefix: {{ $value.topicprefix }}
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source: USER_INFO
value.converter.schema.registry.basic.auth.user.info: ${file:/mnt/secrets/connect-mds-client/bearer.txt:username}:${file:/mnt/secrets/connect-mds-client/bearer.txt:password}
value.converter.schema.registry.ssl.truststore.location: /mnt/sslcerts/truststore.p12
value.converter.schema.registry.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
value.converter.schema.registry.url:
https://schemaregistry.{{ $dot.Values.namespace }}.svc.cluster.local:8081
value.converter.schemas.enable: "true"
The above configurations may vary depending on whether the Oracle database is a CDB (Container Database) or not.
In the last tests we conducted, we identified that the database made a data change by inserting a row into a specific table. The database transaction was committed, but Debezium only sent it to Kafka after 30 minutes. While monitoring, we did not observe that the information was sent to the archive log; it appears to have remained only in the redo log.
If there’s anything specific in the configuration you’d like me to include, please let me know. Looking forward to your insights!
Thanks,