Currently I'm looking setting up Debezium to capture changes in a PostGres database in GCP Cloud SQL. We are using the debezium mainly to stream new changes done.
The challenge I'm facing however is that currently we require multiple connectors in the same database as we would like to have a connector per schema. This to have a bit more redundancy when deploying it to production. I've been trying certain configurations (including using the publication name and the include schema lsit) but I keep running into the issue that the replication slots are not being consumed correctly, causing the wal files to pile up with it finally causing either a massive expansion of the storage or the server crashin.
This is the latest iteration (this still needs to be tested but I wanted to check if I was on the right track before I implemented it). The idea I'm currently thinking of executing is to consume all changes and use the filter functionality to only pass on the messages that are from the replication slots.:
{
"name": "postgresql-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "HOST_HOLDER",
"database.port": "5432",
"database.user": "REPLICATION_USER_PLACEHOLDER",
"database.password": "PLACEHOLDER" ,
"database.dbname" : "DB",
"
database.server.name": "DB",
"
plugin.name": "pgoutput",
"snapshot.mode": "never", # Prevent it from taking snapshots as we don't need them. We only want to stream the new changes
"event.processing.failure.handling.mode": "warn" # warn logs the offset of the problematic event, skips that event, and continues processing.
"
slot.name": "dbz_{schema}",
"slot.drop.on.stop": True,
"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "topic.startsWith("DB.{schema}")"
}
}