Generate additional data in my avro schema

257 views
Skip to first unread message

Rafael Galina

unread,
Mar 11, 2021, 5:23:54 PM3/11/21
to debezium

Hello,

I'm using debezium 1.4 and I need to add some information in my Avro Schema for compliance reasons.
My schema (im using schema-registry) is been registered as below, and I need to add a "doc" key with some information about the data.

My actual payload

{ "type": "record", "name": "Value", "namespace": "dbserver.user", "fields": [ { "name": "USER_ID", "type": "long" }, { "name": "USER_NAME", "type": "string" }, { "name": "USER_INSERT_DATE", "type": { "type": "string", "connect.version": 1, "connect.default": "1970-01-01T00:00:00Z", "connect.name": "io.debezium.time.ZonedTimestamp" }, "default": "1970-01-01T00:00:00Z" } ], "connect.name": "dbserver.user.Value" }

How I wanted

{ "type": "record", "name": "Value", "namespace": "dbserver.user", "fields": [ { "name": "USER_ID", "type": "long", "doc": "Id from user table" }, { "name": "USER_NAME", "type": "string", "doc": " [SENSITIVE] User name" }, { "name": "USER_INSERT_DATE", "type": { "type": "string", "connect.version": 1, "connect.default": "1970-01-01T00:00:00Z", "connect.name": "io.debezium.time.ZonedTimestamp" }, "default": "1970-01-01T00:00:00Z", "doc": "Insert date" } ], "connect.name": "dbserver.user.Value" }

There's any way to do that?
I'm been searching about SMT and custom converters, but nothing seems to solve that


jiri.p...@gmail.com

unread,
Mar 11, 2021, 11:43:55 PM3/11/21
to debezium
Hi,

I'd try to use the SMT. You should build a new schema for the incoming message and you should use `org.apache.kafka.connect.data.SchemaBuilder.doc(String)` method to add documentation to it. It also means that for the field USER_NAME you cannot not use standard org.apache.kafka.connect.data.Schema.STRING_SCHEMA schema but you need to build a custom one with the doc metadata added.

J.

Rafael Galina

unread,
Mar 18, 2021, 12:41:52 PM3/18/21
to debezium
Hi Jiri,

I'm trying to do as you suggested, writing a custom SMT, but i'm stucking with a problem.
I'm starting to think it might have something to do with "io.debezium.transforms.ExtractNewRecordState"


My schema was registered like below (example for order_id,column)

{
  "type": "record",
  "name": "Envelope",
  "namespace": "topic.order",
  "fields": [
    {
      "name": "order_id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "[PK] Table Id",
          "connect.version": 1,
          "connect.name": "order_id"
        }
      ],
      "default": null
    },
I need the doc string stay in the same level as name and type, and not inside the type array, like this
{
  "type": "record",
  "name": "Envelope",
  "namespace": "topic.order",
  "fields": [
    {
      "name": "order_id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "[PK] Table Id",
          "connect.version": 1,
          "connect.name": "order_id"
        }
      ],
      "default": null,
  "doc": "[PK] Table Id"
    },


Here some of my code


BaseTransformation.java
 public R apply(R record) {
        initData(record.topic());

        // Ignoring tombstones
        if (Objects.isNull(record.value())) {
            LOGGER.error("Tombstone message ignored. Message key: \"{}\"", record.key());
            return null;
        }

        Schema schema = this.retrieveSchema();
        Struct toStruct = this.newStruct(record, schema);

        return record.newRecord(
                record.topic(),
                record.kafkaPartition(),
                record.keySchema(),
                null,
                schema,
                toStruct,
                record.timestamp());
    }

DataUtils.java
    public static void addField(SchemaBuilder builder, String name, Schema.Type type, String doc) {
        builder.field(name, new SchemaBuilder(type).name(name).version(1).optional().doc(doc).build());
    }



OrderTableData.java
@Override
    public Schema buildSchema() {
        SchemaBuilder builder = SchemaBuilder.struct();
        builder.name(SCHEMA_NAME);
        builder.version(SCHEMA_VERSION);
        builder.doc(SCHEMA_DOC);

        addField(builder, "order_id", Schema.Type.INT64,"[PK] Table id");
        addField(builder, "order_date", Schema.Type.STRING, "Order date");
        addField(builder, "order_description", Schema.Type.INT64,"Order description";
        addField(builder, "__op", Schema.Type.STRING,"(c)create, (u)update, (d)delete, (r)read - snapshot");
        addField(builder, "__deleted", Schema.Type.STRING,"When true, indicates that it was a delete on database");

        return builder.build();
    }

Please, can you give some tips?

jiri.p...@gmail.com

unread,
Mar 19, 2021, 6:53:01 AM3/19/21
to debezium
Hi,

I am afraid this is current behaviour of AvroConverter. It took the doc and added it as a parameter to the schema.

I'd recommend you to get in touch with COnfluent community if there is some config switch that would copy the schema parameter to the documentation field itself.

J.

Rafael Galina

unread,
Mar 19, 2021, 12:23:30 PM3/19/21
to debe...@googlegroups.com
Jiri, you were right!

I made a downgrade on my confluent platform (from current 6.1 to 5.5) and it worked!
I let an issue open to the confluent community about that.

Thanks so much for your help!

--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/cMG7-M9bMaI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/5e49de47-4c4c-493d-a402-6cd9a8848b5dn%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages