How to connect debezium server to pub/suv gcp

95 views
Skip to first unread message

nacim redam

unread,
Nov 27, 2024, 11:27:16 PM11/27/24
to debezium
Hi,

I am startring to use debezium instead of Oracle OGG and it matches perfectly with my business ecosystem.

I would like to connect my debezium server to pubsub gcp in order to send changes from source Oracle to kafka's pub/sub. but I found nothing about configuration to set hostname or IP address to communicate with publ/sub. Could you please explain to me how can I authenticate to google pub/.sub using debezium?


Thanks in advance

Sincerely,
Nacim

jiri.p...@gmail.com

unread,
Nov 27, 2024, 11:32:35 PM11/27/24
to debezium
Hi,

there should be no need for host/port. The endopint cponnection is handled by Google SDK and you just need project id.

Jiri

nacim redam

unread,
Jan 23, 2025, 6:01:30 AM1/23/25
to debezium
Hi,

thanks for this return. I used the variable  GOOGLE_APPLICATION_CREDENTIALS with  my json's service account.
  When I tried to execute debezium I get a timeout :
22/01/2025 17:27:01 do1yyh1i.noe.edf.fr (1455161) (31) ERROR [io.debezium.embedded.async.AsyncEmbeddedEngine] (pool-7-thread-1) Engine has failed with : java.util.concurrent.ExecutionException: io.debezium.DebeziumException: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 311048 nanoseconds delay) for ListFuture@4ea47269[status=PENDING, info=[futures=[com.google.api.core.AbstractApiFuture$InternalSettableFuture@5717b972[status=PENDING]]]]
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at io.debezium.embedded.async.AsyncEmbeddedEngine.runTasksPolling(AsyncEmbeddedEngine.java:489)
        at io.debezium.embedded.async.AsyncEmbeddedEngine.run(AsyncEmbeddedEngine.java:220)
        at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:180)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: io.debezium.DebeziumException: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 311048 nanoseconds delay) for ListFuture@4ea47269[status=PENDING, info=[futures=[com.google.api.core.AbstractApiFuture$InternalSettableFuture@5717b972[status=PENDING]]]]
        at io.debezium.server.pubsub.PubSubChangeConsumer.handleBatch(PubSubChangeConsumer.java:274)
        at io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor.processRecords(ParallelSmtAndConvertBatchProcessor.java:56)
        at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1191)
        at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1172)
        at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        ... 3 more
Caused by: java.util.concurrent.TimeoutException: Waited 30000 milliseconds (plus 311048 nanoseconds delay) for ListFuture@4ea47269[status=PENDING, info=[futures=[com.google.api.core.AbstractApiFuture$InternalSettableFuture@5717b972[status=PENDING]]]]
        at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:533)
        at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:118)
        at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:75)
        at io.debezium.server.pubsub.PubSubChangeConsumer.handleBatch(PubSubChangeConsumer.java:271)
        ... 10 more

debezium processed the transactions in DB but no messages pushed to my topic in pubsub even when setting debezium.sink.pubsub.wait.message.delivery.timeout.ms=36000000
For your information, This worked fine with gcloud. the messages are pushed in my topic pubsub.

Could you explain what's wrong in my conf debezium:
quarkus.log.console.format=%d{dd/MM/yyyy HH:mm:ss} %H (%i) (%t{id}) %-5p [%c] (%t) %s%e%n
quarkus.log.min-level=TRACE
quarkus.log.level=DEBUG
quarkus.log.console.enable=true
quarkus.log.console.level=DEBUG
quarkus.log.console.json=false

quarkus.log.file.enable=true
quarkus.log.file.json=false
quarkus.log.file.path=/var/projects/datastream/debezium-server3/log/instance/debezium-server-instance.log
quarkus.log.file.format=%d{dd/MM/yyyy HH:mm:ss} %H (%i) (%t{id}) %-5p [%c] (%t) %s%e%n
quarkus.log.file.level=DEBUG
quarkus.log.file.rotation.max-file-size=100M
quarkus.log.file.rotation.max-backup-index=10
quarkus.log.file.rotation.file-suffix=_yyyyMMdd.gz
quarkus.log.file.rotation.rotate-on-boot=true

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.offset.storage.file.filename=/var/projects/datastream/debezium-server3/data/instance/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=DCE***********
debezium.source.database.port=15**
debezium.source.database.user=dbzuser
debezium.source.database.dbname=ANSIBLE
debezium.source.topic.prefix=ANSIBLE
debezium.source.table.include.list=USER1.TEST

debezium.sink.type=pubsub
debezium.sink.pubsub.project.id=prj-dma-bz3-exp-app-63858
debezium.sink.pubsub.wait.message.delivery.timeout.ms=3600000
# The address of the pubsub emulator. Only to be used in a dev or test environment with the pubsub emulator.
# debezium.sink.pubsub.address=

# persistance via fichiers locaux
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=/var/projects/datastream/debezium-server3/data/instance/schema_history_dbzsink1.dat
debezium.source.offset.storage.file.filename=/var/projects/datastream/debezium-server3/data/instance/offsets_dbzsink1.dat

# During a database snapshot, the connector records the table schemas only for the tables from which Debezium captures change events
debezium.source.schema.history.internal.store.only.captured.tables.ddl=true
# The connector records schema structures only for tables in the logical database and schema from which Debezium captures change events.
debezium.source.schema.history.internal.store.only.captured.databases.ddl=true

Thanks in advance.

BR,
Nacim

Nathan Smit

unread,
Jan 23, 2025, 7:09:22 AM1/23/25
to debezium
Hey there, this reads to me as though Debezium can't reach your pub/sub topic.  If your project id is fine and you have a service account with appropriate permissions I guess next things to try are:
  • Make sure no outgoing ports are blocked and no other firewall rules that are preventing you from reaching pub/sub
  • In the run.sh file,  you could add a line near the top something like  export GOOGLE_APPLICATION_CREDENTIALS="/opt/credentials/my-service-account.json" just to make sure the session you're running Debezium with has the environment variable set correctly (not sure if you're using docker or just running the run.sh for debezium-server yourself though)
For what it's worth we use the below configuration for pub/sub.  Your configuration looks fine though

# Sink Configuration
debezium.sink.type=pubsub
debezium.sink.pubsub.project.id={gcp-project}
debezium.sink.pubsub.ordering.enabled=false
debezium.sink.pubsub.flowcontrol.enabled=false
debezium.sink.pubsub.retry.total.timeout.ms=900000
debezium.sink.pubsub.retry.max.rpc.timeout.ms=900000
debezium.sink.pubsub.retry.initial.delay.ms=120000
debezium.sink.pubsub.retry.delay.multiplier=1.2
debezium.sink.pubsub.retry.max.delay.ms=99999999
debezium.sink.pubsub.retry.initial.rpc.timeout.ms=120000
debezium.sink.pubsub.retry.rpc.timeout.multiplier=1.2
debezium.sink.pubsub.wait.message.delivery.timeout.ms=900000

nacim redam

unread,
Jan 27, 2025, 9:40:20 AM1/27/25
to debezium
Hi,

Thanks for this return. I have implemented a CNTLM  proxy on my VM debezium server. In fact, I am able to push on pub/sub when using gcloud  by exporting env variables https_proxy/http_proxy but this fails with debezium et I get error [status=PENDING]]]]. This explain a network issue and consequently, I think that debezium does not understand my variable https_proxy/http_proxy. 

How to put the proxy configuration in debezium server ? is there any option in java to use a proxy ?

Thanks in advance.

BR,
Nacim

Reply all
Reply to author
Forward
0 new messages