Hi everyone!
Can you please tell me what parameters to pay attention to in order to improve the speed of debezium connector?
We have Oracle DB 40 TB, with about 40 million transactions per day on average, and we need only 15 million of them.
For it we use filtering. There is this filtering parameter, which I made thanks to tips from this community:
"transforms.filter.condition": "value.op == \"d\" ? (value.before.schema().field(\"VERSION\") != null ? (value.before.VERSION == \"0\") : value.before.schema().field(\"CLIENT_VERSION\") != null ? value.before.CLIENT_VERSION == \"0\" : value.before.schema().field(\"SUBSCRIBER_VERSION\") != null ? value.before.SUBSCRIBER_VERSION == \"0\" : value.before.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.before.TARIFFPLAN_VERSION == \"0\" : true) : (value.after.schema().field(\"VERSION\") != null ? value.after.VERSION == \"0\" : value.after.schema().field(\"CLIENT_VERSION\") != null ? value.after.CLIENT_VERSION == \"0\" : value.after.schema().field(\"SUBSCRIBER_VERSION\") != null ? value.after.SUBSCRIBER_VERSION == \"0\" : value.after.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.after.TARIFFPLAN_VERSION == \"0\" : true)", We used to use several connectors with simple filtering like:
"transforms.filter.condition":"key.VERSION == \"0\"",But the problem is that during load testing we got different processing time of one object (several tables) in different connectors, where after creating 100 thousand objects in the database, some of them passed immediately, and some of them only after 5 minutes.
When combined into one connector, the same number of objects came to kafka in 10 minutes. This may be critical in the product area, where, as I said, the volumes are higher.
We raised the resource limits to
OFFSET_FLUSH_TIMEOUT_MS: 60000
OFFSET_FLUSH_INTERVAL_MS: 15000
HEAP_OPTS: '-Xms2g -Xmx2g'And set the parameters of connector to the following values
"max.batch.size": 32768,
"max.queue.size": 131072,What can we do? Or is it all about filtering?
All parametres:
{
"name":"single_0105",
"config":{
"connector.class":"io.debezium.connector.oracle.OracleConnector",
"tasks.max":"1",
"database.url":"jdbc:oracle:thin:@()",
"database.port":"",
"database.user":"",
"database.password":"",
"database.dbname": "stagedb",
"database.connection.adapter": "logminer",
"log.mining.strategy": "online_catalog",
"schema.history.internal.kafka.bootstrap.servers":"kafka01:9092,kafka02:9092,kafka03:9092",
"schema.history.internal.kafka.topic":"IMDB_MDM_HISTORY",
"schema.history.internal.store.only.captured.tables.ddl": "true",
"schema.history.internal.store.only.captured.databases.ddl": "true",
"schema.include.list":"MDM",
"table.include.list":"[67 tables]",
"topic.prefix": "IMDB",
"topic.delimiter": "_",
"topic.creation.default.replication.factor":"3",
"topic.creation.default.partitions":"5",
"topic.creation.inventory.delete.retention.ms": 259200000,
"max.batch.size": 32768,
"max.queue.size": 131072,
"snapshot.mode":"schema_only",
"snapshot.locking.mode":"none",
"skipped.operations":"t",
"unavailable.value.placeholder":"__debezium_unavailable_value",
"decimal.handling.mode": "string",
"time.precision.mode":"connect",
"include.schema.changes": "false",
"transforms":"filter",
"transforms.filter.predicate":"MessageClient",
"transforms.filter.type":"io.debezium.transforms.Filter",
"transforms.filter.language":"jsr223.groovy",
"transforms.filter.condition":"value.op == \"d\" ? (value.before.schema().field(\"VERSION\") != null ? (value.before.VERSION == \"0\") : value.before.schema().field(\"CLIENT_VERSION\") != null ? value.before.CLIENT_VERSION == \"0\" : value.before.schema().field(\"SUBSCRIBER_VERSION\") != null ? value.before.SUBSCRIBER_VERSION == \"0\" : value.before.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.before.TARIFFPLAN_VERSION == \"0\" : true) : (value.after.schema().field(\"VERSION\") != null ? value.after.VERSION == \"0\" : value.after.schema().field(\"CLIENT_VERSION\") != null ? value.after.CLIENT_VERSION == \"0\" : value.after.schema().field(\"SUBSCRIBER_VERSION\") != null ? value.after.SUBSCRIBER_VERSION == \"0\" : value.after.schema().field(\"TARIFFPLAN_VERSION\") != null ? value.after.TARIFFPLAN_VERSION == \"0\" : true)",
"predicates":"MessageClient",
"predicates.MessageClient.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.MessageClient.pattern":"IMDB_MDM_.*",
"transforms.filter.null.handling.mode":"drop",
"internal.log.mining.read.only":"true",
"tombstones.on.delete": "false"
}
}