ways to improve solution speed

290 views
Skip to first unread message

Restwin E

unread,
May 28, 2024, 9:18:37 AM5/28/24
to debezium
Hi everyone!
Can you please tell me what parameters to pay attention to in order to improve the speed of debezium connector?

We have Oracle DB 40 TB, with about 40 million transactions per day on average, and we need only 15 million of them.
For it we use filtering. There is this filtering parameter, which I made thanks to tips from this community:
"transforms.filter.condition": "value.op == \"d\" ? (value.before.schema().field(\"VERSION\") != null ? (value.before.VERSION == \"0\") : value.before.schema().field(\"CLIENT_VERSION\") != null ? value.before.CLIENT_VERSION == \"0\" : value.before.schema().field(\"SUBSCRIBER_VERSION\") != null ? value.before.SUBSCRIBER_VERSION == \"0\" : value.before.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.before.TARIFFPLAN_VERSION == \"0\" : true) : (value.after.schema().field(\"VERSION\") != null ? value.after.VERSION == \"0\" : value.after.schema().field(\"CLIENT_VERSION\") != null ? value.after.CLIENT_VERSION == \"0\" : value.after.schema().field(\"SUBSCRIBER_VERSION\")  != null ? value.after.SUBSCRIBER_VERSION == \"0\" : value.after.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.after.TARIFFPLAN_VERSION == \"0\" : true)",

We used to use several connectors with simple filtering like:
"transforms.filter.condition":"key.VERSION == \"0\"",

But the problem is that during load testing we got different processing time of one object (several tables) in different connectors, where after creating 100 thousand objects in the database, some of them passed immediately, and some of them only after 5 minutes.
When combined into one connector, the same number of objects came to kafka in 10 minutes. This may be critical in the product area, where, as I said, the volumes are higher.

We raised the resource limits to
OFFSET_FLUSH_TIMEOUT_MS: 60000
OFFSET_FLUSH_INTERVAL_MS: 15000
HEAP_OPTS: '-Xms2g -Xmx2g'

And set the parameters of connector to the following values
"max.batch.size": 32768,
"max.queue.size": 131072,


What can we do? Or is it all about filtering?

All parametres:
{
    "name":"single_0105",
    "config":{
        "connector.class":"io.debezium.connector.oracle.OracleConnector",
        "tasks.max":"1",
        "database.url":"jdbc:oracle:thin:@()",
        "database.port":"",
        "database.user":"",
        "database.password":"",
        "database.dbname": "stagedb",
"database.connection.adapter": "logminer",
"log.mining.strategy": "online_catalog",
        "schema.history.internal.kafka.bootstrap.servers":"kafka01:9092,kafka02:9092,kafka03:9092",
        "schema.history.internal.kafka.topic":"IMDB_MDM_HISTORY",
"schema.history.internal.store.only.captured.tables.ddl": "true",
"schema.history.internal.store.only.captured.databases.ddl": "true",
        "schema.include.list":"MDM",
        "table.include.list":"[67 tables]",
"topic.prefix": "IMDB",
        "topic.delimiter": "_",
        "topic.creation.default.replication.factor":"3",
        "topic.creation.default.partitions":"5",
        "topic.creation.inventory.delete.retention.ms": 259200000,
        "max.batch.size": 32768,
          "max.queue.size": 131072,
        "snapshot.mode":"schema_only",
"snapshot.locking.mode":"none",
        "skipped.operations":"t",
        "unavailable.value.placeholder":"__debezium_unavailable_value",
"decimal.handling.mode": "string",
        "time.precision.mode":"connect",
"include.schema.changes": "false",
        "transforms":"filter",
"transforms.filter.predicate":"MessageClient",
"transforms.filter.type":"io.debezium.transforms.Filter",
"transforms.filter.language":"jsr223.groovy",
"transforms.filter.condition":"value.op == \"d\" ? (value.before.schema().field(\"VERSION\") != null ? (value.before.VERSION == \"0\") : value.before.schema().field(\"CLIENT_VERSION\") != null ? value.before.CLIENT_VERSION == \"0\" : value.before.schema().field(\"SUBSCRIBER_VERSION\") != null ? value.before.SUBSCRIBER_VERSION == \"0\" : value.before.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.before.TARIFFPLAN_VERSION == \"0\" : true) : (value.after.schema().field(\"VERSION\") != null ? value.after.VERSION == \"0\" : value.after.schema().field(\"CLIENT_VERSION\") != null ? value.after.CLIENT_VERSION == \"0\" : value.after.schema().field(\"SUBSCRIBER_VERSION\")  != null ? value.after.SUBSCRIBER_VERSION == \"0\" : value.after.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.after.TARIFFPLAN_VERSION == \"0\" : true)",
"predicates":"MessageClient",
"predicates.MessageClient.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.MessageClient.pattern":"IMDB_MDM_.*",
        "transforms.filter.null.handling.mode":"drop",
"internal.log.mining.read.only":"true",
        "tombstones.on.delete": "false"
    }
}

Chris Cranford

unread,
May 29, 2024, 9:04:46 AM5/29/24
to debe...@googlegroups.com
Hi Restwin -

One of the main influences when it comes to capture performance with Oracle is both a combination of the redo log sizes and the disk IO speed.  In both of these cases, this will require some conversations with the DBA to see whether there are ways to improve this from Oracle.  However, here are still a few knobs we can turn in the connector configuration to see if we can observe any better outcomes.

Another consideration is setting "log.mining.query.filter.mode" to a value of "in".  This can only be done if the "table.include.list" does not have any regular expressions.  In this case, the database query will apply filters at the database level, reducing the network traffic of changes sent to the connector. 

I'd also look at setting the "log.mining.batch.size.*" settings based on your load.  These control the size of chunks that LogMiner reads per iteration.  By default it starts using a size of 10k and can grow up to 100k.  If you have a higher volume of data, it may make sense to increase this, but I would examine the BatchSize JMX metric to see if you're even reaching the 100k mark before adjusting.  If you are sitting at 100k for extended periods of time, increasing this could help bandwidth.

Additionally, I would also consider adjusting the "query.fetch.size".  By default this starts at 10k, and this controls the number of rows that LogMiner must return before we can process a batch of changes from the LogMiner query.  I would look at the LastDurationOfFetchQueryInMilliseconds and MaxDurationOfFetchQueryInMilliseconds and adjust accordingly.  Note that using database filtering with "log.mining.query.filter.mode" can make these values look worse than they actually are.  For example, if you leave this the default of 10k, and you have a transaction with 1M changes for a non-captured table, LogMiner will need to read across that large transaction, which will take time, in order to collect the number of rows to meet the 10k fetch size request.

Finally, I would also take a look at the QueueRemainingCapacity JMX.  If this value is reaching 0 regularly, you may want to just the "max.batch.size" and "max.queue.size" to increase the throughput and internal queue buffering that is done at the Kafka Connect level to guarantee that slow delivery to Kafka does not impact the read operations from LogMiner.  If this JMX metric reaches 0, the read operations will block until space becomes available.  I see you've adjusted these, but I'd double check your existing values satisfy avoiding blocking on the buffer.

Hope that helps.
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/68785563-0647-42dd-9264-834e754d65b6n%40googlegroups.com.

Message has been deleted

Restwin E

unread,
May 31, 2024, 12:07:58 PM5/31/24
to debezium
Hi Chris!
Thank you for your detailed answer!

We will try these tips and if there are any questions, I will write here.
As I understand tips, first we need to set up the metrics correctly :) Now we haven’t managed to set everything up. For example, we don’t know the values for the listed metrics.

Regarding the latter, I’m sure it’s good, because before setting the parameters "max.batch.size" and "max.queue.size" there were corresponding errors from Kafka. There are no errors now.

среда, 29 мая 2024 г. в 16:04:46 UTC+3, Chris Cranford:
Reply all
Reply to author
Forward
0 new messages