Capture schema changes (DDL) from source to sink

1,962 views
Skip to first unread message

Shehzad

unread,
Jun 21, 2021, 1:26:55 AM6/21/21
to debezium
Hello,

I am using the following configurations and I want to capture DDL changes in the source and apply those same changes to the sink as well.
Source: Postgres
{
    "name": "postgres-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include": "inventory",
        "tables.include": "customers",
        "include.schema​.changes":"true",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

Sink: Postgres
{
        "name": "postgres-sink", 
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics": "dbserver1.inventory.customers",    
            "connection.url": "jdbc:postgresql://postgres:5432/inventory2?user=postgres&password=postgres",
            "database.include": "inventory2",
            "database.history.kafka.bootstrap.servers": "kafka:9092",
            "database.history.kafka.topic": "schema-changes.inventory",
            "transforms": "route,unwrap",
            "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
            "transforms.route.replacement": "$3",
            "transforms.unwrap.drop.tombstones": "false",
            "dialect.name": "PostgreSqlDatabaseDialect",
            "auto.create": "true",
            "auto.evolve":"true",
            "insert.mode": "upsert",
            "pk.fields": "id",
            "pk.mode": "record_key",
            "delete.enabled": "true",
            "connector.type":"sink",
            "topic.creation.enable":"false"
       }
}

I ran the following DDL on the source and it added a new column in the source and I can see it as well.
alter table inventory.customers add column test_column varchar(255) default 'test_column';

ddl.png

But I am not able to see this new column in the sink database.
ddl2.png
Am I missing something? Any help will be much appreciated.

Thanks!



Chris Cranford

unread,
Jun 21, 2021, 3:04:35 AM6/21/21
to debe...@googlegroups.com, Shehzad
Hi Shehzad -

While the source database will reflect the DDL change immediately, you won't see that replicated until a row is added or changed in the source database table.  DDL events that add a default-value are not always viewed by the database engine as "change events" that would be picked up and emitted by the connector.

HTH,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/f50bdc10-11a9-4749-b895-7ea467af0395n%40googlegroups.com.

s.hashmi44

unread,
Jun 21, 2021, 3:27:11 AM6/21/21
to Chris Cranford, debe...@googlegroups.com
Thanks, Chris for the reply.

Can you please confirm what is the behavior of debezium for the following DDL changes in the source?
  • Drop an existing column
  • Change column type
  • Rename column

Will these changes reflect in the sink only if a row is added or changed in the source database table?

Thanks,
Shehzad

Chris Cranford

unread,
Jun 21, 2021, 4:10:27 AM6/21/21
to s.hashmi44, debe...@googlegroups.com
Hi Shehzad -

Let me preface this by saying that when we talk about this behavior, this is all driven by the database in question, not specifically Debezium.  Most databases when you issue a DDL statement doesn't record an event for each pre-existing record that might be altered by the change; instead databases normally simply record that a DDL change occurred, what that DDL change was composed of, and that's it.  But for Postgres, we don't support schema history and so all that gets captured are your typical data change events, i.e. insert / update / delete.

To answer your question, the behavior for Postgres remains the same as adding a column; no changes are recorded in the WAL for existing records and so Debezium is never given an event to process.  Only once a record is inserted, updated, or deleted will the connector then capture the change and emit an event to Kafka that can then be sourced to the sink connector where it will do whatever schema evolution that it supports.

HTH,
Chris
Reply all
Reply to author
Forward
0 new messages