End-to-End Flow in Debezium Oracle with Kafka Connector and Performance Analysis

188 views
Skip to first unread message

Norberto da Silva Prado

unread,
Nov 6, 2024, 7:42:54 AM11/6/24
to debezium

I am currently using Debezium with the Oracle connector for Kafka and would like to understand the complete end-to-end flow, from executing a DML operation (update, insert, delete) on a given table to consuming the resulting topic in Kafka. I’m experiencing performance issues and need to perform an in-depth analysis of the entire process.

Could anyone provide a diagram illustrating this flow, along with documentation that explains the role of each component in a simplified way? Additionally, if there are any more detailed resources specifically related to performance optimization in this flow, that would be extremely helpful.

Thank you in advance!

Chris Cranford

unread,
Nov 6, 2024, 9:22:58 AM11/6/24
to debe...@googlegroups.com
Hi -

Before we get to that detail, it would be helpful to understand what version of Debezium you are using & the connector configuration.

Thanks,
-cc
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/2964e924-6d5c-437f-ba06-c167e96dd9e7n%40googlegroups.com.

Norberto da Silva Prado

unread,
Nov 6, 2024, 4:23:52 PM11/6/24
to debezium
Hello,

Thank you for the response! I’m currently using Debezium version 3.0, and here is a summary of my connector configuration:

Database: Oracle 19c
Connector Properties: 
    snapshot.mode: no_data
    schema.history.internal.store.only.captured.tables.ddl: "true"
    log.mining.strategy: hybrid
    log.mining.query.filter.mode: in
    log.mining.batch.size.min: "1000"
    log.mining.batch.size.max: "500000"
    log.mining.batch.size.default: "1000"
    connector.class: io.debezium.connector.oracle.OracleConnector
    database.dbname: ....
    database.pdb.name: ....
    heartbeat.action.query: "INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')"
    database.password: ....
    database.url: ....
    database.user: ....
    decimal.handling.mode: precise
    field.name.adjustment.mode: avro
    heartbeat.interval.ms: "5000"
    kafka.consumer.offset.commit.enabled: "true"
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.basic.auth.credentials.source: USER_INFO
    key.converter.schema.registry.basic.auth.user.info: ${file:/mnt/secrets/connect-mds-client/bearer.txt:username}:${file:/mnt/secrets/connect-mds-client/bearer.txt:password}
    key.converter.schema.registry.ssl.truststore.location: /mnt/sslcerts/truststore.p12
    key.converter.schema.registry.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    key.converter.schema.registry.url: https://schemaregistry.{{ $dot.Values.namespace }}.svc.cluster.local:8081
    key.converter.schemas.enable: "true"
    lob.enabled: "false"
    notification.enabled.channel: sink
    notification.sink.topic.name: {{ $value.topicprefix }}.notificaton
    schema.history.internal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule
      required username="${file:/mnt/secrets/credential/plain.txt:username}" password="${file:/mnt/secrets/credential/plain.txt:password}";
    schema.history.internal.consumer.sasl.mechanism: PLAIN
    schema.history.internal.consumer.security.protocol: SASL_SSL
    schema.history.internal.consumer.ssl.endpoint.identification.algorithm: https
    schema.history.internal.consumer.ssl.key.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    schema.history.internal.consumer.ssl.keystore.location: /mnt/sslcerts/keystore.p12
    schema.history.internal.consumer.ssl.keystore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    schema.history.internal.consumer.ssl.truststore.location: /mnt/sslcerts/truststore.p12
    schema.history.internal.consumer.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    schema.history.internal.kafka.bootstrap.servers: kafka.{{ $dot.Values.namespace }}.svc.cluster.local:9071
    schema.history.internal.kafka.topic: {{ $value.topicprefix }}.schemahistory
    schema.history.internal.producer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule
      required username="${file:/mnt/secrets/credential/plain.txt:username}" password="${file:/mnt/secrets/credential/plain.txt:password}";
    schema.history.internal.producer.sasl.mechanism: PLAIN
    schema.history.internal.producer.security.protocol: SASL_SSL
    schema.history.internal.producer.ssl.endpoint.identification.algorithm: https
    schema.history.internal.producer.ssl.key.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    schema.history.internal.producer.ssl.keystore.location: /mnt/sslcerts/keystore.p12
    schema.history.internal.producer.ssl.keystore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    schema.history.internal.producer.ssl.truststore.location: /mnt/sslcerts/truststore.p12
    schema.history.internal.producer.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    schema.name.adjustment.mode: avro
    signal.consumer.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule
      required username="${file:/mnt/secrets/credential/plain.txt:username}" password="${file:/mnt/secrets/credential/plain.txt:password}";
    signal.consumer.sasl.mechanism: PLAIN
    signal.consumer.security.protocol: SASL_SSL
    signal.consumer.ssl.endpoint.identification.algorithm: https
    signal.consumer.ssl.key.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    signal.consumer.ssl.keystore.location: /mnt/sslcerts/keystore.p12
    signal.consumer.ssl.keystore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    signal.consumer.ssl.truststore.location: /mnt/sslcerts/truststore.p12
    signal.consumer.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    signal.enabled.channels: kafka
    signal.kafka.bootstrap.servers: kafka.{{ $dot.Values.namespace }}.svc.cluster.local:9071
    signal.kafka.groupId: kafka-signal
    signal.kafka.poll.timeout.ms: "100"
    signal.kafka.topic: {{ $value.topicprefix }}.signal
    table.include.list: ${file:/mnt/secrets/{{ $value.topicprefix | trim | kebabcase }}-cred/db-credentials.txt:table.include.list}
    tasks.max: "1"
    topic.creation.default.cleanup.policy: delete
    topic.creation.default.compression.type: lz4
    topic.creation.default.partitions: "3"
    topic.creation.default.replication.factor: "1"
    topic.prefix: {{ $value.topicprefix }}
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.basic.auth.credentials.source: USER_INFO
    value.converter.schema.registry.basic.auth.user.info: ${file:/mnt/secrets/connect-mds-client/bearer.txt:username}:${file:/mnt/secrets/connect-mds-client/bearer.txt:password}
    value.converter.schema.registry.ssl.truststore.location: /mnt/sslcerts/truststore.p12
    value.converter.schema.registry.ssl.truststore.password: ${file:/mnt/sslcerts/jksPassword.txt:jksPassword}
    value.converter.schema.registry.url: https://schemaregistry.{{ $dot.Values.namespace }}.svc.cluster.local:8081
    value.converter.schemas.enable: "true"

The above configurations may vary depending on whether the Oracle database is a CDB (Container Database) or not.  

In the last tests we conducted, we identified that the database made a data change by inserting a row into a specific table. The database transaction was committed, but Debezium only sent it to Kafka after 30 minutes. While monitoring, we did not observe that the information was sent to the archive log; it appears to have remained only in the redo log.  

If there’s anything specific in the configuration you’d like me to include, please let me know. Looking forward to your insights!

Thanks,

Chris Cranford

unread,
Nov 7, 2024, 8:13:41 AM11/7/24
to debe...@googlegroups.com
Hi -

So the configuration looks solid, you've configured the query.filter.mode and using the hybrid strategy on Debezium 3, so that's a plus.  I think we need to look at your transaction volume on the database, the connector logs with some DEBUG logging, and connector metrics.

Would it be possible to enable DEBUG logging on the Oracle connector, restart the connector and perform your test, and share the full logs once the event is captured? I would recommend the following log configurations, in this order:

    io.debezium.connector.oracle=DEBUG
    io.debezium.connector.oracle.OracleValueConverters=INFO

I'd also recommend, if you can, capturing the following JMX metrics for the same time period:

    - QueueRemainingCapacity
    - NumberOfActiveTransactions
    - SwitchCounter
    - LastDurationOfFetchQueryInMilliseconds
    - TotalResultSetNextTimeInMilliseconds
    - TotalProcessedRows

Thanks,
-cc

Norberto da Silva Prado

unread,
Nov 8, 2024, 4:47:41 AM11/8/24
to debezium
Hi,

Done as requested. Attached are the log and metrics. If you need more information, just let me know.

Thanks,
log.zip
Image 02.jpeg
Image 01.jpeg

Chris Cranford

unread,
Nov 8, 2024, 12:46:24 PM11/8/24
to debe...@googlegroups.com
Hi -

That information was quite helpful. The latency in your system seems directly tied to when you have high bursts of changes and log switches on the Oracle database.

At 18:17, the mining range stride was around 300, with around a 6s latency and batch size of 9000. At this time, the batch size was shrinking on each iteration, indicative that the connector was catching up to near real-time.  At 18:18, the latency is down to about 6s, the batch size now 8000 and the mining range stride was still around 300. At 18:18:16, it improves even more because the mining range stride was less than 200, latency was at 5.6s and batch size had fallen back to the default of 1000. At 18:18:20, you can see the mining stride grew larger, about 750. At 18:18:35, the stride is now around 3000 and this continues to 18:20 and later where the stride continues to grow larger, eventually capping at 50000 batch size at 18:49:15.

Just for reference, the log entries here that I am referring to are these:

    Updated batch size window, using batch size 499000"
    Fetching results for SCN [32153446857738, 32153447355738]"
    Processed in 0 ms. Lag: 46965.
    Counters{rows=0, stuckCount=12, dmlCount=0, ddlCount=0, insertCount=0, updateCount=0, deleteCount=0, commitCount=0, rollbackCount=0, tableMetadataCount=0}.

At 18:20, you had a high burst of changes in your redo logs, but most likely your specific issue is the fact you've set log.mining.batch.size.default to 1000, when the default is 20000. In high burst situations, this is extremely problematic because this setting also serves as the increment, so this being set to 1000 means that on each iteration, the connector will only be able to grow and try to catch up at a rate of 1000 SCNs.

I would recommend dropping "log.mining.batch.size.default" from your connector configuration, use the default and see how the connector responds. Your latency will still spike with high burst activity, but it should recover & drop faster.

Finally, I'll leave you and others with one piece of advice. If you notice the "Processed in {} ms" log entry consistently high, one thing to consider is perhaps your "query.fetch.size" is too small for your data volume. In your logs, there is only one instance where it spiked at 17:44:20 to 11.8s rather than sub-millisecond values like you normally have. This can be indicative of too many database round trips because the query.fetch.size is too small, the JMX metric TotalResultSetNextTimeInMilliseconds continually growing in spikes rather than constant is often a good indicator that the fetch size needs adjusting.

Hope that helps
-cc

Norberto da Silva Prado

unread,
Nov 8, 2024, 7:27:12 PM11/8/24
to debezium
Hi,
We changed the parameter log.mining.batch.size.default, and tomorrow we will validate and share the results.

Meanwhile, I’d like to share one of the previous test cases where something that may be an anomaly occurred. The SCN of transaction 0C001900D9412D00 differs from the SCN presented in Debezium’s [DEBUG] log. In Oracle, it shows SCN 32154598853748, while in the Debezium log, it shows SCN 32154598853749.

Attached are the logs and evidence.

Below is a summary of the test case.

-------------------------------------------
=> Test Case 03

- Transaction 01
08/11/2024 12:08:57 - Insert OS.NUMOS:44972558
<<<< SCN: 32154598853748 >>>>, XID: 0C001900D9412D00

-- Message Log
08/11/2024 12:12:11 - [DEBUG] 2024-11-08 15:12:11,891 [debezium-oracleconnector-oracle_source_com_acr-change-event-source-coordinator] io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor lambda$process$6 - All active transactions: 0c001900d9412d00 (<<<< 32154598853749 >>>>)

- Transaction 02
08/11/2024 12:17:16 - Insert NUMOS:44972560
08/11/2024 12:17:16 - Commit;
08/11/2024 12:17:17 - Update NUMOS:17945360
08/11/2024 12:17:17 - Commit;
08/11/2024 12:17:19 - Delete NUMOS:17941417
08/11/2024 12:17:19 - Commit;

-- Kafka
Topic: oracle_source_com_acr.OS, Last produced: 08/11/2024 12:18:56

- Transaction 01
08/11/2024 12:21:01 - Commit;
---------------------------------------------

Thanks,
log (case 03).zip
Dash02.jpeg
Dash03.jpeg
Dash01.jpeg

Chris Cranford

unread,
Nov 11, 2024, 11:59:53 AM11/11/24
to debe...@googlegroups.com
Hi -

This is normal when you have left the "lob.enabled" configuration to its default, which is false.

Sometime ago we accepted a community contribution that would delay the creation of the "Transaction" record in the buffer until the first DML event for table that is in the "table.include.list". So while a transaction may technically start at an earlier SCN in the redo logs, Debezium will attempt to trim off any irrelevant DML events at the start of the transaction if they're going to be discarded regardless due to the "table.include.list". So you may see that we report the transaction starts at a later SCN than what the redo says because of this. Please take a look at the INSERT at SCN 32154598853748, does it refer to a table not in your "table.include.list"?

Thanks,
-cc
Reply all
Reply to author
Forward
0 new messages