Hello,
I am using the following configurations and I want to capture DDL changes in the source and apply those same changes to the sink as well.
Source: Postgres
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"schema.include": "inventory",
"tables.include": "customers",
"include.schema.changes":"true",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
Sink: Postgres
{
"name": "postgres-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory2?user=postgres&password=postgres",
"database.include": "inventory2",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.route.replacement": "$3",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve":"true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"delete.enabled": "true",
"connector.type":"sink",
"topic.creation.enable":"false"
}
}
I ran the following DDL on the source and it added a new column in the source and I can see it as well.
alter table inventory.customers add column test_column varchar(255) default 'test_column';

But I am not able to see this new column in the sink database.

Am I missing something? Any help will be much appreciated.
Thanks!