Debezium data type conversion and schema consumption

1,396 views
Skip to first unread message

Shichao An

unread,
Jan 7, 2022, 6:55:16 PM1/7/22
to debezium
Hi Debezium friends,

We currently using Vitess connector and we have our own converter (one that implements org.apache.kafka.connect.storage.Converter, not io.debezium.spi.converter.CustomConverter) that converts Kafka Connect data into our own serialized protobuf event before producing to Kafka.

The Vitess connector currently converts all underlying Vitess types into basic Kafka Connect types, e.g. INT16,  INT32, INT64, STRING, FLOAT64. The connector does its best to preserve value but schema type info is lost as a result. For example, an UINT64 type of Vitess is currently converted to a String in Kafka Connect type. The consumers couldn't derive the original Vitess types just from these basic Kafka Connect types, unless they have access to the Vitess table schema as well to reconstruct each column type.

I just realized CustomConverter can actually convert each column based on the original database type and registered a new Kafka Connect schema type, like "io.debezium.postgresql.type.Isbn" in the example. In this case, we would need to implement converter for each column.typeName() known in the source database and register a new schema for it. For example, for UINT64, we register a new schema "io.debezium.vitess.type.Uint64". In this way we can ship the concrete DB schema with the events so consumers may only need minimal efforts to deserialize each data (e.g. convert a String value which was originally a UINT64 Vitess type to actual int64 value in a programming language).

I wonder whether using CustomConverter like this is the recommended/expected approach to achieve this. One caveat for this approach is that this makes each converted type storage-specific so consumers needs to understand how to interpret such types. e.g io.debezium.vitess.type.Uint64 (they see this is a Vitess type). Another concern is from the CustomConverter example it uses RelationalColumn; I'm not sure whether it also supports MongoDB.

Jeff Chao

unread,
Jan 7, 2022, 7:21:35 PM1/7/22
to debezium
Adding some more context (I work with Shichao) -- We're taking the Debezium schema and types and carrying them over to our Protobuf as a proto Struct. Given that these payloads are a JSON representation, we're going to lose some precision for some types such as int64s. That said, eventually we'll have our payloads moved onto Protobufs (instead of JSON via proto Struct), but for now this is where we're at.

jiri.p...@gmail.com

unread,
Jan 10, 2022, 4:39:12 AM1/10/22
to debezium
Hi,

one question. There is https://debezium.io/documentation/reference/1.8/connectors/mysql.html#mysql-property-column-propagate-source-type for relational databases. Maybe providing this functionality for Vitess connector could be the solution for your problem too?

J.

Shichao An

unread,
Jan 10, 2022, 4:58:22 PM1/10/22
to debezium
Hi Jiri,

Thanks for the pointers. I just tried this config "datatype.propagate.source.type" in Vitess connector and it does work. This is the example output:

      {
        "type": "struct",
        "fields": [
          {
            "type": "int64",
            "optional": false,
            "parameters": {
              "__debezium.source.column.type": "INT64"
            },
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "parameters": {
              "__debezium.source.column.type": "VARCHAR"
            },
            "field": "varchar_col"
          },
          {
            "type": "string",
            "optional": true,
            "parameters": {
              "__debezium.source.column.type": "DECIMAL"
            },
            "field": "decimal_col"
          },
          {
            "type": "string",
            "optional": true,
            "parameters": {
              "__debezium.source.column.type": "DECIMAL"
            },
            "field": "decimal_col2"
          }
        ],
        "optional": true,
        "name": "connect_vitess_test_sharded_keyspace.test_sharded_keyspace.t1.Value",
        "field": "after"
      },

Yes. It provides exactly what I'm looking for and it leaves just one last concern: do we have something similar for MongoDB connector?

Gunnar Morling

unread,
Jan 11, 2022, 12:07:35 PM1/11/22
to debezium
> do we have something similar for MongoDB connector?

No, not really. Given MongoDB's nature of being a schemaless store, we don't receive this kind of metadata from the oplog / change stream. There's an SMT (https://debezium.io/documentation/reference/stable/transformations/mongodb-event-flattening.html) which recreates a typed structure based on the stringi-fied representation we get, but this also may not fully reconstruct all the potential type information there is.

--Gunnar

Shichao An

unread,
Jan 11, 2022, 8:22:24 PM1/11/22
to debe...@googlegroups.com
Hi Gunnar,

Thank you for the pointers and it's really helpful. We will soon investigate MongoDB connectors in our use-cases.

I just read the SMT docs on State Extraction. I just want to confirm whether MongoDB's ExtractNewDocumentState SMT supports source connector. The docs only mentions sink connector (but I think it should support source connector as well?), but the relational database's ExtractNewRecordState SMT docs mentions both source and sink connector.

--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/g2nE7Dwmrl8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/c8fc8464-9fcb-4f62-8559-ac6a4b94193en%40googlegroups.com.

jiri.p...@gmail.com

unread,
Jan 11, 2022, 11:52:51 PM1/11/22
to debezium
Hi,

it is both source and sink.

J.

Shichao An

unread,
Jan 12, 2022, 2:19:13 PM1/12/22
to debe...@googlegroups.com
I see. Thank you, Jiri.

Yang Wu

unread,
May 23, 2022, 1:04:38 PM5/23/22
to debezium
> Given MongoDB's nature of being a schemaless store, we don't receive this kind of metadata from the oplog / change stream. 

Given this schema-less nature of MongoDB, do we see any value in passing through the raw bytes representation of the documents returned by Mongo (e.g., the Change Event Document https://www.mongodb.com/docs/manual/reference/change-events/)

I'm thinking of an extra configurable field in the Mongo event `raw_document`, the imaginary behaviors are:
- it is controlled by a `enable_raw_document` flag on the task
- if the flag is off (default), `raw_document` will be empty and the events are populated as is
- if the flag is on, `raw_document` will include the byte representation of mongo document. The existing `before/patch` field will be left empty 

Some reasons one might want to enable `raw_document`
- Mongo's documents are key-value pairs already, the patch/field are basically json format of the document. The raw document mode allows connector to skip serdes work if we go with immutable RawBsonDocument, which provides a getByteBuffer interface.
- The consumer of Mongo's Debezium events needs to do similar amount of work when they receive the raw document or the existing Debezium format

Granted, because of the immutable feature of RawBsonDocument, the flag also needs to make sure it works field.renames and field.exclude.list, i.e., if we have `enable_raw_document` the documents should be immutable and don't allow any modifications.

Curious how ppl feel about this idea. Thanks.

jiri.p...@gmail.com

unread,
May 24, 2022, 1:44:30 AM5/24/22
to debezium
Hi,

MongoDB sends the "raw" document in string format. What will be the difference if you send it as binary value? IMHO they should be the smae, should not it?

J.

Yang Wu

unread,
May 24, 2022, 2:13:53 AM5/24/22
to debezium
The MongoDB's document is not really a string format. 
In order to get string representation (usually json format), it requires serdes the map into the json string - this process is pretty expensive and could be avoid if the connector has `enable_raw_document`. See toString from BSON as an example.

jiri.p...@gmail.com

unread,
May 24, 2022, 4:03:49 AM5/24/22
to debezium
Fair enough. The output message should have an optional field `raw` type BINARY. I can imagine a config option named data.format as enum with two values - JSON/RAW where JSON is the default. If RAW_HEX is used then the `raw` field will be filled with the binary raw data.

Sounds reasonable?

Yang Wu

unread,
May 24, 2022, 11:35:45 AM5/24/22
to debezium
To clarify, this is the current payload format:

"payload":{
   "after":"{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"an...@noanswer.org\"}",
   "patch":null,
   "source":{
      "version":"2.0.0.Alpha1",
      "connector":"mongodb",
      "name":"fulfillment",
      "ts_ms":1558965508000,
      "snapshot":false,
      "db":"inventory",
      "rs":"rs0",
      "collection":"customers",
      "ord":31,
      "h":1546547425148721999
   },
   "op":"c",
   "ts_ms":1558965515240
}

IIUC, if we set data.format = RAW, we can have two options:

1. raw in the existing field
"payload":{
   "after":"xxxxxxxxxx",  <----- raw byte representation of the `after` field
   "patch":null,
   "source":{
      "version":"2.0.0.Alpha1",
      "connector":"mongodb",
      "name":"fulfillment",
      "ts_ms":1558965508000,
      "snapshot":false,
      "db":"inventory",
      "rs":"rs0",
      "collection":"customers",
      "ord":31,
      "h":1546547425148721999
   },
   "op":"c",
   "ts_ms":1558965515240
}

2. raw in an extra field
"payload":{
   "after":null,
   "patch":null,
   "source":{
      "version":"2.0.0.Alpha1",
      "connector":"mongodb",
      "name":"fulfillment",
      "ts_ms":1558965508000,
      "snapshot":false,
      "db":"inventory",
      "rs":"rs0",
      "collection":"customers",
      "ord":31,
      "h":1546547425148721999
   },
   "raw": "xxxxxxxxxxxxxxxxx". <------ raw byte representation of the document returned by MongoDB
   "op":"c",
   "ts_ms":1558965515240
}

I think both can be useful:
  • #1 keeps the existing debezium output format while avoid some serdes with json string. However because Debezium supports field modification (e.g., exclude fields), it makes a document needs to remain modifiable -- i.e., we cannot take advantage of the bytebuffer interface from RawBsonDocument. Converting a bson.Document to bytes is better than json string but also quite expensive that involves a codec.
  • #2 is a step further, requiring consumer to work with the bytes of Mongo's Change Event document directly, I can imagine some advanced Mongo user might want to interact with a immutable version of document and just pass it through. This setup we can take advantage of RawBsonDocument and provide a high performant connector for raw type use cases.

IMHO, I think data.type = raw fits #1 better whereas a new `enable_raw` (or some other name) flag fits #2 better. WDYT?

Jeff Chao

unread,
May 24, 2022, 8:04:48 PM5/24/22
to debezium
What would the raw representation contain? I'm asking because:
  1. Get a better understanding of the additional size overhead this option would incur.
  2. The payload we're interested in is the before/after/patch's. This seems to me we would need raw for each of these? Or would all 3 of these be encoded in the `raw` field?

Yang Wu

unread,
May 24, 2022, 8:31:12 PM5/24/22
to debezium
> What would the raw representation contain?
If we look at #1, then the "raw" would be byte representation of before/after/patches - although we are choosing between string (json serdes) vs bytes (codec), it is not fully clear to me when ppl would prefer bytes.

If we look at #2 (an extra new `raw` field), then the raw would be the byte representation for the document mongo returns - which includes all 3 fields

Reply all
Reply to author
Forward
0 new messages