Connector.json
{
"name": "inventory-svc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "db",
"database.port": "3306",
"database.user": "db-user",
"database.password": "P@swr11d",
"database.allowPublicKeyRetrieval" : true,
"database.server.id": "223001",
"database.server.name": "inventory_log",
"database.include.list": "inventory-svc",
"table.include.list" : "inventory-svc.events",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory_log",
"include.schema.changes": "true",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload.id": "aggregate_id",
"transforms.outbox.table.field.event.timestamp" : "occurred_on",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"transforms.outbox.table.fields.additional.placement": "type:header:type",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}Events Table
create table if not exists `events` (
id varchar(36) primary key,
aggregate_id varchar(20) not null,
aggregate_type varchar(20) not null,
type varchar(30) not null,
payload text not null,
occurred_on datetime not null,
version int not null
);
Thanks & Regards,
Kuldeep
