Problema with a custom SMT using AvroConverter

25 views
Skip to first unread message

Rafael Galina

unread,
Mar 19, 2021, 11:13:10 AM3/19/21
to Confluent Platform
Hello,

I'm writing a custom SMT, but i'm stucking with a problem.


My schema was registered like below (example for order_id,column)

{
  "type": "record",
  "name": "Envelope",
  "namespace": "topic.order",
  "fields": [
    {
      "name": "order_id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "[PK] Table Id",
          "connect.version": 1,
          "connect.name": "order_id"
        }
      ],
      "default": null
    },
I need the "doc" string stay in the same level as name and type, and not inside the type array, like this
{
  "type": "record",
  "name": "Envelope",
  "namespace": "topic.order",
  "fields": [
    {
      "name": "order_id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "[PK] Table Id",
          "connect.version": 1,
          "connect.name": "order_id"
        }
      ],
      "default": null,
  "doc": "[PK] Table Id"
    },


Here some of my code


BaseTransformation.java
 public R apply(R record) {
        initData(record.topic());

        // Ignoring tombstones
        if (Objects.isNull(record.value())) {
            LOGGER.error("Tombstone message ignored. Message key: \"{}\"", record.key());
            return null;
        }

        Schema schema = this.retrieveSchema();
        Struct toStruct = this.newStruct(record, schema);

        return record.newRecord(
                record.topic(),
                record.kafkaPartition(),
                record.keySchema(),
                null,
                schema,
                toStruct,
                record.timestamp());
    }

DataUtils.java
    public static void addField(SchemaBuilder builder, String name, Schema.Type type, String doc) {
        builder.field(name, new SchemaBuilder(type).name(name).version(1).optional().doc(doc).build());
    }



OrderTableData.java
@Override
    public Schema buildSchema() {
        SchemaBuilder builder = SchemaBuilder.struct();
        builder.name(SCHEMA_NAME);
        builder.version(SCHEMA_VERSION);
        builder.doc(SCHEMA_DOC);

        addField(builder, "order_id", Schema.Type.INT64,"[PK] Table id");
        addField(builder, "order_date", Schema.Type.STRING, "Order date");
        addField(builder, "order_description", Schema.Type.INT64,"Order description";
        addField(builder, "__op", Schema.Type.STRING,"(c)create, (u)update, (d)delete, (r)read - snapshot");
        addField(builder, "__deleted", Schema.Type.STRING,"When true, indicates that it was a delete on database");

        return builder.build();
    }

Please, can you give some help?

Rafael Galina

unread,
Mar 19, 2021, 12:40:26 PM3/19/21
to Confluent Platform
I realized that this happens in the current version (6.1). I downgraded to 5.5 and it worked as I would like. 
is it possible to adjust it to work that way for future versions?
Reply all
Reply to author
Forward
0 new messages