Avro Support For Event Payload Field

385 views
Skip to first unread message

Kuldeep Yadav

unread,
Aug 8, 2021, 8:03:12 AM8/8/21
to debezium
Hi,

I was playing around with the Debezium mysql connector and it worked for me in first few tries :), so hats off to the team for wonderful tool.

But I could not find anywhere how to generate Avro serialised payload field from mysql events table (outbox transformation) so that it could be then consumed by the consumer using `KafkaAvroDeserilizer`.

With little bit of googling, I found this stackoverflow question - https://stackoverflow.com/questions/58262839/debezium-outbox-pattern-is-schema-is-fixed-with-smt-outbox-table-if-we-use-deb , which has similar kind of requirement but the comment & answer in this question did not lead me anywhere.

Basically, what I want to have is that payload field from outbox events table is serialized with avro before reaching to kafka topic and then consumer can deserialize it into corresponding pojo. Right now it ends up as String only.

Connector.json

{
  "name": "inventory-svc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "db",
    "database.port": "3306",
    "database.user": "db-user",
    "database.password": "P@swr11d",
    "database.allowPublicKeyRetrieval" : true,
    "database.server.id": "223001",
    "database.server.name": "inventory_log",
    "database.include.list": "inventory-svc",
    "table.include.list" : "inventory-svc.events",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory_log",
    "include.schema.changes": "true",
    "tombstones.on.delete": "false",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.payload.id": "aggregate_id",
    "transforms.outbox.table.field.event.timestamp" : "occurred_on",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
    "transforms.outbox.table.fields.additional.placement": "type:header:type",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Events Table

create table if not exists `events` (
    id varchar(36) primary key,
    aggregate_id varchar(20) not null,
    aggregate_type varchar(20) not null,
    type varchar(30) not null,
    payload text not null,
    occurred_on datetime not null,
    version int not null
);

Thanks & Regards,

Kuldeep

 

Kuldeep Yadav

unread,
Aug 8, 2021, 8:35:30 AM8/8/21
to debezium
Hi,

Sorry the connector JSON is wrong in the above message, below is the actual JSON
{
  "name": "inventory-svc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "db",
    "database.port": "3306",
    "database.user": "db-user",
    "database.password": "P@swr11d",
    "database.allowPublicKeyRetrieval" : true,
    "database.server.id": "223001",
    "database.server.name": "inventory_log",
    "database.include.list": "inventory-svc",
    "table.include.list" : "inventory-svc.events",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory_log",
    "include.schema.changes": "true",
    "tombstones.on.delete": "false",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.payload.id": "aggregate_id",
    "transforms.outbox.table.field.event.timestamp" : "occurred_on",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
    "transforms.outbox.table.fields.additional.placement": "type:header:type"
,
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Thanks & Regards,
Kuldeep

Gunnar Morling

unread,
Aug 12, 2021, 3:10:43 AM8/12/21
to debezium
Hi,

Please see here for details on using Avro as the outbox event format:


Hth,

-Gunnar
Reply all
Reply to author
Forward
0 new messages