Hi Chris,
Just to clarify, we are not using the signal table but the signal topic approach.
I hope I've changed anything risky.
{
"name": "MySQL",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"topic.creation.default.partitions": "3",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.cleanup.policy": "delete",
"
topic.creation.default.retention.ms": "94670856000",
"tasks.max": "1",
"database.history.kafka.topic": "<server_name>_SCHEMA_CHANGES",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"snapshot.fetch.size": "10000",
"delete.handling.mode": "rewrite",
"decimal.handling.mode": "double",
"database.history.skip.unparseable.ddl": "true",
"database.history.store.only.captured.tables.ddl": "true",
"errors.retry.timeout": "300000",
"database.hostname": "${aws:<secret_name>:database-host-name}",
"database.port": "${aws:<secret_name>:database-port}",
"database.user": "${aws:<secret_name>:database-user}",
"database.password": "${aws:<secret_name>:database-password}",
"
database.server.id": "<server_id>",
"
database.server.name": "<server_name>",
"database.include.list": "rcs_ua",
"database.history.kafka.bootstrap.servers": "${aws:<secret_name>:kafka-brokers}",
"value.converter.schema.registry.url": "
http://schema-registry.kafka:8081",
"key.converter.schema.registry.url": "
http://schema-registry.kafka:8081",
"table.include.list": "rcs_ua.material,rcs_ua.orders",
"snapshot.mode": "when_needed",
"signal.kafka.topic": "dataplatform-roots-dbz-signal",
"signal.kafka.bootstrap.servers": "${aws:<secret_name>:kafka-brokers}",
"read.only": "true",
"transforms": "unwrap,RemoveString",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode" : "rewrite",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RemoveString.regex": "(.*)_ua(.*)",
"transforms.RemoveString.replacement": "$1$2",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "SCRAM-SHA-512",
"database.history.consumer.sasl.jaas.config": "${aws:<secret_name>:jaas-auth}",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "SCRAM-SHA-512",
"database.history.producer.sasl.jaas.config": "${aws:<secret_name>:jaas-auth}"
}
}
then we add a new table, rcs_ua.deliveries to
"table.include.list" parameter
and produce to topic
<server_name>;{"type":"execute-snapshot","data": {"data-collections": ["rcs_ua.deliveries"], "type": "INCREMENTAL"}}
regards