Increase the capacity of sink-connectors

50 views
Skip to first unread message

Vladislav P

unread,
Nov 24, 2025, 2:41:06 AM (9 days ago) Nov 24
to debezium
Hello everyone. 
How can I improve the speed of subtraction from kafka sink with a connector?
Here I launched the source connector and counted 40 million records in kafka in 1.5 days.
I started the sink connector and it was so slow to transfer them from kafka to Postgre. (I poured it in 1.5 days, the source connector was stopped for 1 day)
There is 1 partition in kafka.
It seems that you can set the settings in the sink connector.:
- batch.size
- consumer.max.poll.record
but somehow it doesn't really help.  The sink connection is lagging behind the source. It took 40 minutes, with a backlog ("Consumer Lag") of 1.7 million records.
The source connector has the following settings:

```
{
    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,

    "log.mining.strategy": "online_catalog",
    "log.mining.query.filter.mode": "in",
    "log.mining.batch.size.max": 6000000,
    "log.mining.batch.size.default": 1200000,
    "log.mining.transaction.retention.ms": "1800000",

    "max.queue.size": 393216,
    "max.batch.size": 98304,
    "poll.interval.ms": 5,
}
```

Chris Cranford

unread,
Nov 24, 2025, 4:17:36 AM (9 days ago) Nov 24
to debe...@googlegroups.com
Hi -

I'm afraid your options are going to be severely limited because the topic has a single partition.

If a topic had at least 2 partitions, you could scale the sink with 2 tasks, each reading from each partition. This would double your current throughput. If you had a sufficient number of partitions, and a smaller number of sink tasks, the partitions would be evenly distributed across the tasks, allowing each task to consume from 1 or more partitions. This is the best way to have sink connectors scale with Kafka. But when the topic has a single partition, you're limited to just a single task. Even if you set the sink to use 5 tasks, only one will ever be busy because only one partition can be read by a single task at any given moment in time.

In addition, this is precisely why the source connector took 1.5 days to synchronize such few records. 

A single partition is great for very specific corner cases, but as you can see it has a significant impact on both the source and sink task performance. It's one of those situations where if you don't need a single partition, you ideally should have multiple partitions for concurrent reads/writes.

-cc
``` --
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/07378adc-54b2-4ffe-a8e8-c00cd1843c81n%40googlegroups.com.

Vladislav P

unread,
Nov 24, 2025, 5:05:45 AM (9 days ago) Nov 24
to debezium
Thanks for the reply. Regarding the partitions, I tried to set the following settings:

"topic.creation.enable": "true",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 10,

It worked fine, although I saw a WARNING in the logs.:
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now  [org.apache.kafka.clients.producer.internals.Sender]

How critical is it?
It's just that right now we don't have the ability to scale kafka and it's running on 1 node.
понедельник, 24 ноября 2025 г. в 13:17:36 UTC+4, Chris Cranford:

Chris Cranford

unread,
Nov 24, 2025, 9:42:03 AM (9 days ago) Nov 24
to debe...@googlegroups.com
If you only have 1 Kafka broker node and you are seeing this in the Kafka Connect logs, then I would suggest taking a look at the broker. Either the broker is unhealthy, hasn't fully started, or there is some other problem that is making the broker confused on why it isn't the "leader" in a single-node setup.

-cc

Vladislav P

unread,
Nov 24, 2025, 1:17:46 PM (9 days ago) Nov 24
to debezium
Got it, thanks.

Now I have set up 5 sections and set max.task = 5. Reads are really much faster, just great. But there is a caveat, Postgre has a limit of 100 connections ("max_connections").
And when "max.task = 5" I see a WARNING: "Fatal: sorry, too many clients already".
If I run "SELECT count(*) FROM pg_stat_activity" I see 106.
Sometimes SELECT just doesn't work.

I would like to know how the number of connections is calculated. If "max.task = 5" then the maximum number will be 5 * 32, where 32 = "connection.pool.max_size"?


понедельник, 24 ноября 2025 г. в 18:42:03 UTC+4, Chris Cranford:

Vladislav P

unread,
Nov 24, 2025, 5:13:53 PM (9 days ago) Nov 24
to debezium
Chris, give me some advice, I still see it in logs and metrics sometimes.:
"Committedthrough = from 1000 to 2000"
"LagFromSourceInMilliseconds = 12000000"
OffsetScn lagged significantly behind the database.
I restarted it several times, but the values persisted.
In the logs, I also saw a transaction hanging for some reason (despite the fact that I have the parameter "log.mining.transaction.retention.ms : 1800000"):

```
2025-11-24T17:52:06,984 WARN   Oracle||streaming  Offset SCN 6359371671095 has not changed in 25 mining session iterations. This indicates long running transaction(s) are active. Commit SCNs {1=6359376011095, 2=6359376010970}.   [io.debezium.connector.oracle.logminer.OffsetActivityMonitor]
2025-11-24T18:44:53,290 WARN   Oracle||streaming  Offset SCN 6359371671095 has not changed in 25 mining session iterations. This indicates long running transaction(s) are active. Commit SCNs {1=6359377011092, 2=6359377011075}.   [io.debezium.connector.oracle.logminer.OffsetActivityMonitor]
2025-11-24T19:15:48,108 WARN   Oracle||streaming  Offset SCN 6359371671095 has not changed in 25 mining session iterations. This indicates long running transaction(s) are active. Commit SCNs {1=6359377408661, 2=6359377408648}.   [io.debezium.connector.oracle.logminer.OffsetActivityMonitor]
```

After a few hours, it stops and the connector starts to catch up.
"Committedthrough = from 6000 to 7000"
"LagFromSourceInMilliseconds = tends to 0".

What could be the reason for this?
Maybe you have examples of queries that can be used to find these transactions or understand what they are. (ORACLE)

понедельник, 24 ноября 2025 г. в 22:17:46 UTC+4, Vladislav P:

Chris Cranford

unread,
Nov 25, 2025, 6:43:58 AM (8 days ago) Nov 25
to debe...@googlegroups.com
Hi,

In the JDBC sink, if you do not customize `connection.pool.max_size`, then yes the maximum sessions the JDBC sink will open will be 5*32, or 160 connections. So you ideally will want to reduce `connection.pool.max_size` to something more reasonable, perhaps 8 or look at scaling the PostgreSQL database so that it supports more than 100 connections. But be aware doing the latter you should also look at a PostgreSQL sizing guide to understand what other options, such as shared_buffers, should be increased and by how much based on the maximum connection value.

-cc

Chris Cranford

unread,
Nov 25, 2025, 9:21:49 AM (8 days ago) Nov 25
to debe...@googlegroups.com
Hi -

I believe you misunderstand how "log.mining.transaction.retention.ms" works. This isn't based on the current_time - transaction_start_time. It is instead based on the current_event_time - transaction_start_time. This distinction is important, because it allows you to safely stop the connector and resume the connector some minutes or hours later without risk that data will be discarded simply because you had to take a downtime window.

Moreover if you look at the Commit SCN values (6359377408661 and 6359377408648), we can see that over time they're changing, which means we're processing data. If we look at the various of those with the Offset SCN, they are 5,737,566 / 5,737,553 different. A large transaction can easily span several million SCNs depending on what the transaction does. 

But what I see as far more telling is the variance in Commit SCN during these windows

    Redo Thread 1 Variance 1,397,566 in ~1.5 hours
    Redo Thread 2 Variance 1,397,678 in ~1.5 hours

I don't recall what Debezium version you are currently on, but a few recent improvements that might be worthwhile to know about:

    DBZ-9664 - Fixes an issue where the low-watermark calculation was too conservative when lob.enabled=false (the default) [1] (Debezium 3.2.5.Final, 3.3.2.Final, 3.4.Beta1+)
    DBZ-9661 - LogMiner query to always exclude all `MLOG$%` tables if you use materialized view refreshes (mainly helpful if you don't set `log.mining.query.filter.mode`) [2] (Debezium 3.4.Beta1+)
    DBZ-9660 - Introduce configurable `internal.log.mining.hash.area.size` and `internal.log.mining.sort.area.size` options [3] (Debezium 3.4.Beta1+)

We just released Debezium 3.2.5.Final and 3.3.2.Final is scheduled for later this week. If you're actively on Debezium 3.2, I'd suggest trying to update and see if the fix in DBZ-9664 helps. If you're in a position where you can update to the latest snapshot build, then I might suggest you grab the snapshot build of Debezium 3.4 [4] and use it. This will be the basis for Beta1 build tomorrow, which includes every one of these fixes.

One general note about DBZ-9660 for everyone and this is highly important. Oracle sessions set these to default values that are designed for most use cases. While a session can adjust these, doing so can have negative impacts if done incorrectly. Setting these too small will degrade Debezium's performance significantly. Setting these values too large can have impacts on other database processes. It's generally best to find out from the DBA team what are the default sizes (in bytes) used for these settings in your environment, and agree with them on a reasonable amount of extra memory the LogMiner session can consume beyond the default. These are intended for extreme tuning cases where necessary, and it's recommended for these to be used as a last resort and that you know what the impact has on the source system.

Lastly to your question, my suggestion is to use my query tool based on the OffsetScn to get details about the transactions that are in progress before and around the SCN position.

Thanks,
-cc

[1]: https://issues.redhat.com/browse/DBZ-9664
[2]: https://issues.redhat.com/browse/DBZ-9661
[3]: https://issues.redhat.com/browse/DBZ-9660
[4]: https://central.sonatype.com/repository/maven-snapshots/io/debezium/debezium-connector-oracle/3.4.0-SNAPSHOT/debezium-connector-oracle-3.4.0-20251124.120214-216-plugin.tar.gz
[5]: https://github.com/Naros/debezium-oracle-query-tool
Reply all
Reply to author
Forward
0 new messages