Increase the capacity of sink-connectors

136 views
Skip to first unread message

Vladislav P

unread,
Nov 24, 2025, 2:41:06 AMNov 24
to debezium
Hello everyone. 
How can I improve the speed of subtraction from kafka sink with a connector?
Here I launched the source connector and counted 40 million records in kafka in 1.5 days.
I started the sink connector and it was so slow to transfer them from kafka to Postgre. (I poured it in 1.5 days, the source connector was stopped for 1 day)
There is 1 partition in kafka.
It seems that you can set the settings in the sink connector.:
- batch.size
- consumer.max.poll.record
but somehow it doesn't really help.  The sink connection is lagging behind the source. It took 40 minutes, with a backlog ("Consumer Lag") of 1.7 million records.
The source connector has the following settings:

```
{
    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,

    "log.mining.strategy": "online_catalog",
    "log.mining.query.filter.mode": "in",
    "log.mining.batch.size.max": 6000000,
    "log.mining.batch.size.default": 1200000,
    "log.mining.transaction.retention.ms": "1800000",

    "max.queue.size": 393216,
    "max.batch.size": 98304,
    "poll.interval.ms": 5,
}
```

Chris Cranford

unread,
Nov 24, 2025, 4:17:36 AMNov 24
to debe...@googlegroups.com
Hi -

I'm afraid your options are going to be severely limited because the topic has a single partition.

If a topic had at least 2 partitions, you could scale the sink with 2 tasks, each reading from each partition. This would double your current throughput. If you had a sufficient number of partitions, and a smaller number of sink tasks, the partitions would be evenly distributed across the tasks, allowing each task to consume from 1 or more partitions. This is the best way to have sink connectors scale with Kafka. But when the topic has a single partition, you're limited to just a single task. Even if you set the sink to use 5 tasks, only one will ever be busy because only one partition can be read by a single task at any given moment in time.

In addition, this is precisely why the source connector took 1.5 days to synchronize such few records. 

A single partition is great for very specific corner cases, but as you can see it has a significant impact on both the source and sink task performance. It's one of those situations where if you don't need a single partition, you ideally should have multiple partitions for concurrent reads/writes.

-cc
``` --
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/07378adc-54b2-4ffe-a8e8-c00cd1843c81n%40googlegroups.com.

Vladislav P

unread,
Nov 24, 2025, 5:05:45 AMNov 24
to debezium
Thanks for the reply. Regarding the partitions, I tried to set the following settings:

"topic.creation.enable": "true",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 10,

It worked fine, although I saw a WARNING in the logs.:
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. Going to request metadata update now  [org.apache.kafka.clients.producer.internals.Sender]

How critical is it?
It's just that right now we don't have the ability to scale kafka and it's running on 1 node.
понедельник, 24 ноября 2025 г. в 13:17:36 UTC+4, Chris Cranford:

Chris Cranford

unread,
Nov 24, 2025, 9:42:03 AMNov 24
to debe...@googlegroups.com
If you only have 1 Kafka broker node and you are seeing this in the Kafka Connect logs, then I would suggest taking a look at the broker. Either the broker is unhealthy, hasn't fully started, or there is some other problem that is making the broker confused on why it isn't the "leader" in a single-node setup.

-cc

Vladislav P

unread,
Nov 24, 2025, 1:17:46 PMNov 24
to debezium
Got it, thanks.

Now I have set up 5 sections and set max.task = 5. Reads are really much faster, just great. But there is a caveat, Postgre has a limit of 100 connections ("max_connections").
And when "max.task = 5" I see a WARNING: "Fatal: sorry, too many clients already".
If I run "SELECT count(*) FROM pg_stat_activity" I see 106.
Sometimes SELECT just doesn't work.

I would like to know how the number of connections is calculated. If "max.task = 5" then the maximum number will be 5 * 32, where 32 = "connection.pool.max_size"?


понедельник, 24 ноября 2025 г. в 18:42:03 UTC+4, Chris Cranford:

Vladislav P

unread,
Nov 24, 2025, 5:13:53 PMNov 24
to debezium
Chris, give me some advice, I still see it in logs and metrics sometimes.:
"Committedthrough = from 1000 to 2000"
"LagFromSourceInMilliseconds = 12000000"
OffsetScn lagged significantly behind the database.
I restarted it several times, but the values persisted.
In the logs, I also saw a transaction hanging for some reason (despite the fact that I have the parameter "log.mining.transaction.retention.ms : 1800000"):

```
2025-11-24T17:52:06,984 WARN   Oracle||streaming  Offset SCN 6359371671095 has not changed in 25 mining session iterations. This indicates long running transaction(s) are active. Commit SCNs {1=6359376011095, 2=6359376010970}.   [io.debezium.connector.oracle.logminer.OffsetActivityMonitor]
2025-11-24T18:44:53,290 WARN   Oracle||streaming  Offset SCN 6359371671095 has not changed in 25 mining session iterations. This indicates long running transaction(s) are active. Commit SCNs {1=6359377011092, 2=6359377011075}.   [io.debezium.connector.oracle.logminer.OffsetActivityMonitor]
2025-11-24T19:15:48,108 WARN   Oracle||streaming  Offset SCN 6359371671095 has not changed in 25 mining session iterations. This indicates long running transaction(s) are active. Commit SCNs {1=6359377408661, 2=6359377408648}.   [io.debezium.connector.oracle.logminer.OffsetActivityMonitor]
```

After a few hours, it stops and the connector starts to catch up.
"Committedthrough = from 6000 to 7000"
"LagFromSourceInMilliseconds = tends to 0".

What could be the reason for this?
Maybe you have examples of queries that can be used to find these transactions or understand what they are. (ORACLE)

понедельник, 24 ноября 2025 г. в 22:17:46 UTC+4, Vladislav P:

Chris Cranford

unread,
Nov 25, 2025, 6:43:58 AMNov 25
to debe...@googlegroups.com
Hi,

In the JDBC sink, if you do not customize `connection.pool.max_size`, then yes the maximum sessions the JDBC sink will open will be 5*32, or 160 connections. So you ideally will want to reduce `connection.pool.max_size` to something more reasonable, perhaps 8 or look at scaling the PostgreSQL database so that it supports more than 100 connections. But be aware doing the latter you should also look at a PostgreSQL sizing guide to understand what other options, such as shared_buffers, should be increased and by how much based on the maximum connection value.

-cc

Chris Cranford

unread,
Nov 25, 2025, 9:21:49 AMNov 25
to debe...@googlegroups.com
Hi -

I believe you misunderstand how "log.mining.transaction.retention.ms" works. This isn't based on the current_time - transaction_start_time. It is instead based on the current_event_time - transaction_start_time. This distinction is important, because it allows you to safely stop the connector and resume the connector some minutes or hours later without risk that data will be discarded simply because you had to take a downtime window.

Moreover if you look at the Commit SCN values (6359377408661 and 6359377408648), we can see that over time they're changing, which means we're processing data. If we look at the various of those with the Offset SCN, they are 5,737,566 / 5,737,553 different. A large transaction can easily span several million SCNs depending on what the transaction does. 

But what I see as far more telling is the variance in Commit SCN during these windows

    Redo Thread 1 Variance 1,397,566 in ~1.5 hours
    Redo Thread 2 Variance 1,397,678 in ~1.5 hours

I don't recall what Debezium version you are currently on, but a few recent improvements that might be worthwhile to know about:

    DBZ-9664 - Fixes an issue where the low-watermark calculation was too conservative when lob.enabled=false (the default) [1] (Debezium 3.2.5.Final, 3.3.2.Final, 3.4.Beta1+)
    DBZ-9661 - LogMiner query to always exclude all `MLOG$%` tables if you use materialized view refreshes (mainly helpful if you don't set `log.mining.query.filter.mode`) [2] (Debezium 3.4.Beta1+)
    DBZ-9660 - Introduce configurable `internal.log.mining.hash.area.size` and `internal.log.mining.sort.area.size` options [3] (Debezium 3.4.Beta1+)

We just released Debezium 3.2.5.Final and 3.3.2.Final is scheduled for later this week. If you're actively on Debezium 3.2, I'd suggest trying to update and see if the fix in DBZ-9664 helps. If you're in a position where you can update to the latest snapshot build, then I might suggest you grab the snapshot build of Debezium 3.4 [4] and use it. This will be the basis for Beta1 build tomorrow, which includes every one of these fixes.

One general note about DBZ-9660 for everyone and this is highly important. Oracle sessions set these to default values that are designed for most use cases. While a session can adjust these, doing so can have negative impacts if done incorrectly. Setting these too small will degrade Debezium's performance significantly. Setting these values too large can have impacts on other database processes. It's generally best to find out from the DBA team what are the default sizes (in bytes) used for these settings in your environment, and agree with them on a reasonable amount of extra memory the LogMiner session can consume beyond the default. These are intended for extreme tuning cases where necessary, and it's recommended for these to be used as a last resort and that you know what the impact has on the source system.

Lastly to your question, my suggestion is to use my query tool based on the OffsetScn to get details about the transactions that are in progress before and around the SCN position.

Thanks,
-cc

[1]: https://issues.redhat.com/browse/DBZ-9664
[2]: https://issues.redhat.com/browse/DBZ-9661
[3]: https://issues.redhat.com/browse/DBZ-9660
[4]: https://central.sonatype.com/repository/maven-snapshots/io/debezium/debezium-connector-oracle/3.4.0-SNAPSHOT/debezium-connector-oracle-3.4.0-20251124.120214-216-plugin.tar.gz
[5]: https://github.com/Naros/debezium-oracle-query-tool

Vladislav P

unread,
Dec 9, 2025, 3:10:46 AMDec 9
to debezium
Hi, Chris.
Have you encountered a deadlock problem if we set "topic.creation.default.partitions": 5 for the source connector and "max.task = 5 or 1" for the sink connector.

Now my connector is constantly falling off with an error:
```
Caused by: org.postgresql.util.PSQLException: ERROR: deadlock detected\n  Detail: Process 541247 waits for ShareLock on transaction 155893555; blocked by process 547114.\nProcess 547114 waits for ShareLock on transaction 155893558; blocked by process 541247.
```

Previously, there is a possibility that records with the same ID will end up in different partitions. The source connector has "message.key.columns": "SCHEMA.TABLE:ID,LASTDATE(time column);"
вторник, 25 ноября 2025 г. в 18:21:49 UTC+4, Chris Cranford:

Chris Cranford

unread,
Dec 9, 2025, 10:00:09 AMDec 9
to debe...@googlegroups.com
Hi -

If you have two transactions that modify many rows in a single data page or in a table that raise row-level locks to data-page or table-level locks, this can occur.

Are you sure this is related to the `tasks.max` setting on the sink?
Have you checked to see what process 
547114 is in this case or what process has created transaction 155893555?

-cc

Vladislav P

unread,
Dec 9, 2025, 11:20:12 AMDec 9
to debezium
This is most likely not directly related to `tasks.max'. Since I have now launched the sink-connector with the parameter `tasks.max=1'. And after a while it crashed with the same error. In 'pg_stat_activity`, all activities relate to the Debezium connector.

I looked into kafka topics. I have a lot of messages that have the same ID, but the LASTDATE changes and they end up in different partitions.

1 partition: ID = 121, LASTDATE: '2025-12-09 16:11:56'
2 partition: ID = 121, LASTDATE: '2025-12-09 16:16:23'


Could this be the reason? It seems to be possible to determine how to distribute messages into partition, please tell me the settings. If I distribute into partition based on the ID column, can it help with this problem?

вторник, 9 декабря 2025 г. в 19:00:09 UTC+4, Chris Cranford:

Chris Cranford

unread,
Dec 9, 2025, 12:47:36 PMDec 9
to debe...@googlegroups.com
Hi, 

So looking into this a bit more, this isn't Debezium specific but rather a side effect of how PostgreSQL operates. Debezium uses `INSERT ON CONFLICT` to handle upsert scenarios. However, unfortunately PostgreSQL executes these statements independently and if you have a situation where the same ID is updated in the same batch but with different LASTDATE values, you can hit a conflict with locks on the composite index.

One way we could work around this limitation would be to identify when the deadlock exception is raised, and from there we rollback and apply the changes in the buffer one-by-one in separate transactions rather than one large batch. This would address your issue with the deadlock as both insert/updates would be done separately and in order, and would allow the connector to continue to use batch operations where possible for maximum efficiency.

Could you please raise a GitHub issue at https://github.com/debezium/dbz.

In the meantime, you could set `batch.size` to 1 to eliminate the use case and apply all each change in a single transaction scope.

Thanks,
-cc

Vladislav P

unread,
Dec 10, 2025, 5:27:35 AM (13 days ago) Dec 10
to debezium
 I tried to describe the problem:  https://github.com/debezium/dbz/issues/32

вторник, 9 декабря 2025 г. в 21:47:36 UTC+4, Chris Cranford:

Vladislav P

unread,
Dec 14, 2025, 9:57:47 AM (9 days ago) Dec 14
to debezium
Hi Chris.
Do I understand correctly that if I have multiple partitions in topic kafka, then I won't be able to get rid of the deadlock problem?
Does it turn out that even if the sink connector has "max.task = 1" set, it can still read from several partitions?  Has anyone encountered such a problem?

It turns out that the only solution for me now is to work with 1 partition the topic. I've already done this, it works, but the event handling has worsened.

среда, 10 декабря 2025 г. в 14:27:35 UTC+4, Vladislav P:

Chris Cranford

unread,
Dec 15, 2025, 12:22:28 AM (9 days ago) Dec 15
to debe...@googlegroups.com
Hi -

This should have nothing to do with the Kafka topic partitions. 

The issue is related to how the JDBC sink buffers and flushes records to the target table in batches. When you have a table using a composite key and there are multiple rows in the same batch that are in close proximity to one another in the index (e.g. same first column value but different second column value), this happens due to how PostgreSQL executes the `INSERT ON CONFLICT` batch statement. You can theoretically face this issue regardless of the `max.task` value.

In your case the likely reason a single partitioned topic just works is by pure coincidence. If your `batch.size` is set to 500, as long as you never read two events fro the topic where you have the same `ID` but different `LASTDATE` values in that 500 span window, it will just work. But as soon as you have events in that batch size window that are flushed together with the same `ID` but different `LASTDATE` values, they can trigger the composite index deadlock. This is simply due to how PostgreSQL executes JDBC batches that use`INSERT ON CONFLICT` when not using a value list.

Unfortunately whether you use 1 partition or if you set `batch.size` to 1, performance will be much worse. This is why we raised the GH issue to try and develop a solution that allows the connector to operate at optimal performance and only fallback to a single batch flush approach if absolutely necessary for these use cases.

-cc

Vladislav P

unread,
Dec 15, 2025, 3:16:43 AM (8 days ago) Dec 15
to debezium
We tried removing the "LASTDATE" field from the primary key so that records with the same ID would end up in the same topic, but I still noticed a deadlock.

Could setting "use.reduction buffer: true" help solve this problem?
Does this setup merge data from different sink connector tasks?
понедельник, 15 декабря 2025 г. в 09:22:28 UTC+4, Chris Cranford:

Chris Cranford

unread,
Dec 15, 2025, 5:19:03 AM (8 days ago) Dec 15
to debe...@googlegroups.com
Hi -

The reduction buffer is task-scoped, so no it won't merge data from different sink tasks. 

If you've reduced the composite key to single column, do you get the deadlock if "tasks.max" is set to "1" despite having 1 or more partitions in the topic?

-cc

Vladislav P

unread,
Dec 15, 2025, 5:33:02 AM (8 days ago) Dec 15
to debezium
Hi

Yes, when I set 'task.max: 1' my connector also crashed and I saw a deadlock in the logs.

понедельник, 15 декабря 2025 г. в 14:19:03 UTC+4, Chris Cranford:
Reply all
Reply to author
Forward
0 new messages