Debezium UnwrapFromEnvelope transformation "delete.handling.mode=rewrite" property is not working

3,718 views
Skip to first unread message

Ramu N

unread,
Aug 29, 2018, 9:34:44 AM8/29/18
to debezium
Hi,

i want to use Debezium UnwrapFromEnvelope transformation on Debezium MySQL source connector and i would like to receive the delete event with record(i.e. a record with d operation that contains only old row data).
i have tried "delete.handling.mode" property with "none" and "rewrite", but still delete event is coming as null.

my configuration:
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.drop.tombstones=false
#transforms.unwrap.drop.deletes=false
transforms.unwrap.delete.handling.mode=rewrite

Logs:
[2018-08-29 16:25:10,977] DEBUG Received query command: Event{header=EventHeaderV4{timestamp=1535540110000, eventType=QUERY, serverId=1, headerLength=19, dataLength=55, nextPosition=11092, flags=8}, data=QueryEventData{threadId=133, executionTime=0, errorCode=0, database='config', sql='BEGIN'}} (io.debezium.connector.mysql.BinlogReader:606)
[2018-08-29 16:25:10,977] DEBUG Received update table metadata event: Event{header=EventHeaderV4{timestamp=1535540110000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=50, nextPosition=11161, flags=0}, data=TableMapEventData{tableId=116, database='config', table='tbl_pipeline', columnTypes=15, -4, -2, 1, 3, 1, 3, 1, columnMetadata=32, 3, 63233, 0, 0, 0, 0, 0, columnNullability={1, 2, 3, 4, 5, 6, 7}}} (io.debezium.connector.mysql.BinlogReader:673)
[2018-08-29 16:25:10,977] DEBUG Recorded 2 delete record(s) for event: Event{header=EventHeaderV4{timestamp=1535540110000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=57, nextPosition=11237, flags=0}, data=DeleteRowsEventData{tableId=116, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    [[B@b45fbd, [B@4d3306, 1, 1, 100, 5, 3000, 1]
]}} (io.debezium.connector.mysql.BinlogReader:870)
[2018-08-29 16:25:11,001] TRACE Delete message Struct{pipeline_id=pipeline2} requested to be dropped (io.debezium.transforms.UnwrapFromEnvelope:98)

i am using Debezium mysql connector plugin version 0.8.1
can anyone help me how to resolve this issue!

Thanks,
Ramu N

Jiri Pechanec

unread,
Aug 31, 2018, 12:21:47 AM8/31/18
to debezium
Hi,

https://issues.jboss.org/browse/DBZ-857 has been done but was not released yet.

J.

Adrian

unread,
Sep 3, 2018, 11:42:59 PM9/3/18
to debezium
Hi,

Is there a way to use this feature before the release?

I tried building the code from Github and used the following files to define the connector:
  • debezium-connector-postgres-0.9.0-SNAPSHOT.jar
  • debezium-core-0.9.0-SNAPSHOT.jar
  • protobuf-java-2.6.1.jar
  • postgresql-42.0.0.jar
Also with the following configuration:
   "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transformed.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.drop.tombstones": false

But still getting a null value for the message value.

Thanks,
Adrian S

Gunnar Morling

unread,
Sep 4, 2018, 1:29:41 AM9/4/18
to debezium
Hi,

Can you try also to specify

     tombstones.on.delete=false

?

By default the connector will emit the delete event (with the "d" flag) and a tombstone record. "rewrite" in the SMT leaves that tombstone untouched, so you should suppress it with that connector option.

Hth,

--Gunnar

Adrian

unread,
Sep 4, 2018, 12:33:50 PM9/4/18
to debezium
Hi,

It worked! Thanks!

Further on this, I'm just wondering, is it suppose have the other fields to be null when it is in the REWRITE config? I have attached an image for this clarification. 
The message at offset 0 is the create, while the message at offset 1 is the delete. I just need some of the fields filled up on the delete event, for me to resolve it. 

Am I missing something or it's how it supposed to work? Thanks for the help!

Best,
Adrian S

Screen Shot 2018-09-05 at 12.23.18 AM.png

Gunnar Morling

unread,
Sep 5, 2018, 2:45:17 AM9/5/18
to debezium
Hi,

This looks odd and it's not what I'd expect. Does the delete event contain all the fields if you don't apply the transformation at all? Or is the original event already lacking the fields? If so, it may be that your REPLICA IDENTITY settings need to be altered. See this section in the docs here:


If the original event looks good though and this is caused by the SMT, then it's a bug which we need to fix.

Hth,

--Gunnar

mv...@griddynamics.com

unread,
Oct 5, 2018, 5:51:30 AM10/5/18
to debezium
Folks,

still on this transformation but I just want to forward the DELETE message with the UnwrapFromEnvelope transformation :

  {
...

     "connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
...

"key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schema.registry.url":"http://localhost:8081",
     "value.converter":"io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url":"http://localhost:8081",
     "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",        
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
...
     “transforms”:“unwrap”,
     “transforms.unwrap.type”:“io.debezium.transforms.UnwrapFromEnvelope”,
     “transforms.unwrap.drop.tombstones”:“false”,
     “transforms.unwrap.tombstones.on.delete”:“false”,
     “transforms.unwrap.delete.handling.mode”:“none”
   }

the on.delete was a catch from this thread ... but the fact is that without any transformation the AVRO message is absolutely perfect and includes the DELETE as a 'd' command ... but after doing the unwrap with the parameters described here it does not send the DELETE messages no longer but only INSERTs and UPDATEs.

am I missing any concept here ??? I was expecting the DELETE message to be sent with a NULL value and the normal key representing a deletion to a compaction log topic on the next step.

let me know please if this makes sense to you and why do the DELETEs are not posted !

thanks a lot,

Mário.



On Wednesday, August 29, 2018 at 3:34:44 PM UTC+2, Ramu N wrote:

Jiri Pechanec

unread,
Oct 7, 2018, 11:58:27 PM10/7/18
to debezium
Hi,

the delete messages disappers if you set both  “transforms.unwrap.drop.tombstones” and  “transforms.unwrap.tombstones.on.delete” to false. If you keep at least on of it set to true then there will be atl east one message designating delete created. The key is primary id and the value is null. If your downstream pipelin is able to handle down the null messages and interpert them correctly as delete then this is the way to go.

The problem is the for example JDBC sink connector lacks support for deletes and ends up with NPE. Hence the need to disable it.

J.

mv...@griddynamics.com

unread,
Oct 8, 2018, 7:57:45 AM10/8/18
to debezium
Hwy Jiri,

thanks a lot for the excellent support on this group ... unfortunately I have to say it is not working the way we were expecting. When I use the configuration :

"key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schema.registry.url":"http://localhost:8081",
     "value.converter":"io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url":"http://localhost:8081",
     "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",        
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",

WITHOUT any transformation ... the DELETE messages are being posted perfectly to the topic. And in case I use the transformation :

     “transforms”:“unwrap”,
     “transforms.unwrap.type”:“io.debezium.transforms.UnwrapFromEnvelope”,
     “transforms.unwrap.drop.tombstones”:“false”,
     “transforms.unwrap.tombstones.on.delete”:“false”,
     “transforms.unwrap.delete.handling.mode”:“none”

and I tried ALL combinations including removing the deprecated tombstones.on.delete out of the way and using rewrite , none and drop with drop.tombstones both true and false and it DOES NOT send the DELETE message which happens perfectly without the transformation. 

please any help with this urgent issue would be quite appreciated,


Mário.

Jiri Pechanec

unread,
Oct 8, 2018, 8:13:01 AM10/8/18
to debezium
Hi,

just a question - what do you expect to receive as the delete message?

Also, could you please enable TRACE level logging for category `io.debezium.transforms.UnwrapFromEnvelope`? It might help with diagnostics.

J.

mv...@griddynamics.com

unread,
Oct 8, 2018, 8:15:45 AM10/8/18
to debezium
Hi Jiri,

I was expecting something like what you described yourself :

 If you keep at least on of it set to true then there will be atl east one message designating delete created

I will enable the TRACE ... is there a quick way to do it ?

thanks,

Mário.

Jiri Pechanec

unread,
Oct 8, 2018, 8:42:04 AM10/8/18
to debezium
The logging is described here - https://debezium.io/docs/configuration/logging/

J.

mv...@griddynamics.com

unread,
Oct 8, 2018, 9:25:42 AM10/8/18
to debezium
Hey Jiri,

I have turned TRACE on ... but do not see any reason looking at the log for why the DELETE command is not being turned into a topic message ... and INSERT and UPDATE works properly and , as said before , when I remove the transformation DELETEs are propagated properly.

is there any other transformation that could be used in order to filter the CDC before / after fields ???

thanks,

Mário.

Jiri Pechanec

unread,
Oct 8, 2018, 9:48:41 AM10/8/18
to debezium
I think there must be something wrong in your config
I've used this registration request

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184055",
        "database.server.name": "dbserver",
        "database.whitelist": "inventory",
        "table.whitelist": "inventory.customers",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms":"unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.unwrap.drop.tombstones":"false",
                  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "include.schema.changes": "false"
    }
}

I've creatde and removed a record in cutomers table and got
{"id":1} {"id":1,"first_name":"a","last_name":"b","email":"c"}
{"id":1} null


J.

mv...@griddynamics.com

unread,
Oct 8, 2018, 12:25:56 PM10/8/18
to debezium
Hey Jiri,

you are using MySQL !!! ;) I am using MS SQL Server ... they are different connectors and I really would appreciate if you could do the same test with the MS SQL Server as the MySQL really works fine.

thanks once again for your support,

Mário.

mv...@griddynamics.com

unread,
Oct 9, 2018, 4:11:48 AM10/9/18
to debezium
I really wish I could get any comments on this Jiri ... anything like "I have no experience with MS SQL Server connector" ... would be enough.

Jiri Pechanec

unread,
Oct 9, 2018, 4:16:19 AM10/9/18
to debezium
Hi,

I tried the same with SQL Server example in the same location.

The registration request was 
{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "database.server.name" : "server1",
        "database.hostname" : "sqlserver",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.dbname" : "testDB",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "transforms":"unwrap",
        "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.unwrap.drop.tombstones":"false"
    }
}

And the result of of insert followd by delete is
"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.testDB.dbo.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.testDB.dbo.customers.Value"},"payload":{"id":1005,"first_name":"a","last_name":"b","email":"c"}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.testDB.dbo.customers.Key"},"payload":{"id":1005}} {"schema":null,"payload":null}


I had schema enabled in this case so it is more verbose but you can see that the tombstone is really present and the payload is null.

J.

Jiri Pechanec

unread,
Oct 9, 2018, 4:17:09 AM10/9/18
to debezium
Sorry for the late reply, I got to it only now...

J.

mv...@griddynamics.com

unread,
Oct 9, 2018, 4:20:56 AM10/9/18
to debezium
Ohhh that's good news ... I am using AVRO converters ... I am going to do some tests following your recommendations here and get back to you asap.

thanks Jiri , this is being really helpful !

Mário.

mv...@griddynamics.com

unread,
Oct 9, 2018, 4:54:40 AM10/9/18
to debezium
I found the problem Jiri,

when I use :

      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url":"http://localhost:8081",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"http://localhost:8081",

combined with the wrap transformation...

it stops printing the DELETEs ... though the payload format is just perfect for INSERTs and UPDATEs ... it sounds to me like a BUG but you can confirm it better than I surely. Should I use a different transformation over the payload without going over Schema Registry ? How can I come to a result where the message is on AVRO format and DELETEs are present at the TOPIC ???

Mário.

Jiri Pechanec

unread,
Oct 9, 2018, 9:41:03 AM10/9/18
to debezium
Even in this case I cannot confirm a problem, please see https://github.com/jpechane/debezium-examples/blob/sqlserver-avro/tutorial/docker-compose-sqlserver-avro.yaml

If Avro is used for both key and value then I get
{"id":1005} {"id":1005,"first_name":"a","last_name":"b","email":"c"}
{"id":1005} null


If Avro is used  for value and String for key I get (key printing is disabled)
{"id":1005,"first_name":"a","last_name":"b","email":"c"}
null

So everything works as expected

J.

Mario Vera

unread,
Oct 9, 2018, 10:05:09 AM10/9/18
to debe...@googlegroups.com
Jiri, seems like we are coming to a good conclusion ... on your YAML :

- DIS_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter       
- KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter       
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter       
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter       
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter       
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081       
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081

could you please convert this to JSON equivalent as a connector descriptor ? I do not see the connect_* params used anywhere else and at your examples on GitHub JSONs do not mention schema registry though it is titled AVRO as well as do not seem to have a direct equivalence to these configuration parameters above.

thanks once again for your precious time.

Mário.

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/65c36813-bc73-4a81-bbae-e86b79592c71%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mario Vera

unread,
Oct 9, 2018, 10:42:13 AM10/9/18
to debe...@googlegroups.com
I replaced all underscores for periods and got an error about the connect.key and connect.value as they do not seem compatible with my version of Debezium connector. I kept using just key.converter and value.converter for the Schema Registry URL and everything returned to the same place as I was even though I used the dis.key.converter as AVRO. In other words Jiri, it does not seem to be in sync with your results... I still do not see DELETEs unless I DO NOT use the AVRO Schema Registry together with the unwrap transformation.

I apoogyze for this but could you please send me your connector JSON descriptor ? there is no reason for running fine on one instance and not on another. The results you exposed are exactly what I need.

thanks once again,

Mário.

mv...@griddynamics.com

unread,
Oct 9, 2018, 11:21:54 AM10/9/18
to debezium
Folks,

rereading the whole thread it seems to me that there should be a more solid recommendation from you guys on this issue. As Gunnar says we should add : 

tombstones.on.delete=false. // A DEPRECATED OPTION

and then Jiri says that using both of the options on false makes the DELETE message disappears. I am still not able to make it work with MS SQL Server using AVRO Schema Registry with unwrap transformation and sending me to a Docker image implementation does not solve it. I would like to have a JSON descriptor with the complete configuration formally suggested by you guys. I am using MS SQL Server instance on AWS and Kafka confluent installation (not local environment and no way to go for docker on this by now) and if I do NOT use AVRO Schema Registry and go for the pure JSON format with unwrap transformation the DELETE messages do go through normally as expected and reported by Jiri.

please give me a hand on this as my deadline is approaching,

Mário. 
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

Jiri Pechanec

unread,
Oct 9, 2018, 12:52:15 PM10/9/18
to debezium
Please use Docker Compose to start the test environment using config file https://github.com/jpechane/debezium-examples/blob/sqlserver-avro/tutorial/docker-compose-sqlserver-avro.yaml

Then init the database using command
cat debezium-sqlserver-init/inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

as described in https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server

and register the connector using registration request https://github.com/jpechane/debezium-examples/blob/sqlserver-avro/tutorial/register-sqlserver-avro-unwrap.json

J.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

Mario Vera

unread,
Oct 9, 2018, 1:26:36 PM10/9/18
to debe...@googlegroups.com
oK Jiri, I will use your image and test it out. 

thanks once again for your help.

Mário.

mv...@griddynamics.com

unread,
Oct 10, 2018, 5:18:17 AM10/10/18
to debezium
Jiri,

the problem is here ... KSQL from Confluent does not print null fields and that is why they were not showing up. After using the Kafka CLI with standard consumers I could see the posts in deed. There is nothing wrong with the Debezium connector so far. The usage of null as a valid data is a mistaken policy on my humble point of view as this kind os problems may arise ... I understand this was a design by Kafka Stream API for the Log Compaction but just brainstorming here that null is ERROR. 

thanks once again for your support and we shall stick with Debezium and will be able to exchange some performance testings pretty soon,

Mário.

Jiri Pechanec

unread,
Oct 11, 2018, 12:29:58 AM10/11/18
to debezium
Thanks for the follow up! May I ask if you could convert this finding into a FAQ (https://debezium.io/docs/faq/) so it is saved for a future reference? It would be great if you could prepare a docs PR in https://github.com/debezium/debezium.github.io

Have a nice day

J.

Mario Vera

unread,
Oct 11, 2018, 4:31:25 AM10/11/18
to debe...@googlegroups.com
Yes Jiri, I can do it !

will contribute on what I really had experience with which was MS SQL Server connector :


and mention the Confluent KSQL as a note that like some other clients might not deal well with nulls.

let me know,

Mário.

P.S> the FAQ is kind of a high level it seems ... not sure.


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

mv...@griddynamics.com

unread,
Oct 11, 2018, 9:41:00 AM10/11/18
to debezium
DONE with the NOTE for KSQL.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages