Slow processing using Debezium Server to send messages to pubsub

986 views
Skip to first unread message

Nathan Smit

unread,
Nov 23, 2021, 2:51:06 AM11/23/21
to debezium
I'm currently working to fetch data from Oracle using logminer and push to pubsub.  Initially, I was using Kafka Connect to push messages to pubsub, however, we would like to have a kafka-less environment for this.

We've setup Debezium server and everything is working, however, messages seem to be getting processed very slowly.  Attached is an image from pubsub which shows that it's receiving about 300 messages and then waiting a beat and then 300 more.  Overall though it's taking a very long time to process rows (about 5000 rows in 15 minutes when doing the snapshot).  Strangely, I didn't see this behaviour when using Google's Kafka-to-Pubsub connector.

Is there something I'm getting wrong in the config that is causing this?  I've pasted our config below.

debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=projectid
debezium.sink.pubsub.ordering.enable=false
debezium.sink.pubsub.null.key=null

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.server.name=server1
debezium.source.database.hostname=hostname
debezium.source.database.port=1521
debezium.source.database.user=user
debezium.source.database.password=password

debezium.source.database.dbname=dbname
debezium.source.database.out.server.name=dbzxout
debezium.source.database.connection.adapter=logminer
debezium.source.database.tablename.case.insensitive=true
debezium.source.table.include.list=lookup.branch_test_limited,lookup.branch_test,pick.pick_test
debezium.source.database.tablename.case.insensitive=true
debezium.source.database.oracle.version=11

debezium.format.key=json
debezium.format.value=json

debezium.transforms=RerouteData,RerouteSchema
debezium.transforms.RerouteSchema.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.RerouteSchema.topic.regex= server1
debezium.transforms.RerouteSchema.topic.replacement=server1_schema_nathan_testing
debezium.transforms.RerouteData.type=io.debezium.transforms.ByLogicalTableRouter
debezium.transforms.RerouteData.topic.regex=glodc_schema.([^.]+)\.([^.]+)
debezium.transforms.RerouteData.topic.replacement=server1_cdc_data_nathan_testing


debezium.source.database.history.file.filename=data/file_database_history
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory

quarkus.http.port=8080
quarkus.log.level=INFO
quarkus.log.console.json=true



unacked.png

Nathan Smit

unread,
Nov 25, 2021, 6:29:08 AM11/25/21
to debezium
After some investigation here, the issue seems to be latency on the pubsub side.  I had a look at Google's Kakfa-->Pubsub connect and the way this is mitigated there is by using batching/concurrency/retries.  Is this something that could be added to the pubsubchangeconsumer?  I made below changes as a test with hardcoded values and recompiled.  The performance was significantly improved.  It'd be great to have some additional config for this in Debezium to allow more fine-tuning of the sink?

        BatchingSettings.Builder batchingSettings = BatchingSettings.newBuilder()
        .setDelayThreshold(Duration.ofMillis(100))
        .setElementCountThreshold(100L)
        .setRequestByteThreshold(1000000L);

        ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(40).build();

        publisherBuilder = (t) -> {
            try {
                return Publisher.newBuilder(t)
                        .setEnableMessageOrdering(orderingEnabled)
                        .setBatchingSettings(batchingSettings.build())
                        .setRetrySettings(
                            RetrySettings.newBuilder()
                            .setTotalTimeout(Duration.ofMillis(60000))
                            .setMaxRpcTimeout(Duration.ofMillis(10000))
                            .setInitialRetryDelay(Duration.ofMillis(5))
                            .setRetryDelayMultiplier(2)
                            .setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE))
                            .setInitialRpcTimeout(Duration.ofSeconds(10))
                            .setRpcTimeoutMultiplier(2)
                            .build())                        
                        .setExecutorProvider(executorProvider)
                        .build();
            }
            catch (IOException e) {
                throw new DebeziumException(e);
            }
        };

Chris Cranford

unread,
Nov 30, 2021, 4:03:32 AM11/30/21
to debe...@googlegroups.com, Nathan Smit
Hi Nathan -

I would recommend that you raise a Jira and feel free to submit a PR with your changes so we can discuss them there.  We certainly welcome any contributions!

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/efd32992-11cc-41db-bcb0-bb5f246703e8n%40googlegroups.com.

Nathan Smit

unread,
Nov 30, 2021, 4:56:59 AM11/30/21
to debezium
Hey Chris,

Thanks a lot for the feedback.  I actually created a Jira ticket and PR based on feedback I got in the chat.  

I didn't realise initially that commits need the ticket number prefixed so hopefully don't mess it up.  I think in practice you might want the configuration to work such that you can choose which settings to enable/disable but what I've done in the PR works pretty well for us.  We're based in South Africa though so I imagine latency is a bigger problem for us than it might be for other users.

Adam Whitmore

unread,
Jan 10, 2022, 5:20:53 AM1/10/22
to debezium
Nathan - would any of the additional config options you've added allow for setting the max timeout for a single transaction?  I've seen "Network channel is closed" grpc errors when processing transactions that took over an hour.

Thanks,
Adam

Nathan Smit

unread,
Jan 13, 2022, 9:29:48 AM1/13/22
to debezium
Hey Adam.  To be honest, I've never seen a "Network Channel is closed" error.  I think it's worth trying out those configs and seeing if it helps, though.  In theory, you should hit one of the timeouts and then there'd be a retry.

Gunnar Morling

unread,
Jan 13, 2022, 10:27:53 AM1/13/22
to debe...@googlegroups.com
Hey all,

Just wanted to let you know that Nathan's PR just got merged (thanks for that work, Nathan!) and these options will be available as of the upcoming 1.9.0.Alpha1 release (or the nightly builds until then).

Best,

--Gunnar


Reply all
Reply to author
Forward
0 new messages