Debezium kafka message not JSON format as expected

1,328 views
Skip to first unread message

John Psoroulas

unread,
Aug 31, 2018, 9:52:45 AM8/31/18
to debezium
Hi,

we test the Debezium connector for postgreSQL,
all seems to work fine except for the kafka message format that is not JSON as expected
Here is our test procedure:

Following the instructions described at 
we install the Debezium connector (v0.8.1.Final) 
for PostgreSQL (v9.6.10) 
as a connector of confluent platform (v5.0.0) 
on a CentOS Linux 7 (kernel: Linux 3.10.0-862.11.6.el7.x86_64).

We use the wal2json (https://github.com/eulerto/wal2json) output plugin for the 
logical decoding. We compile and install the plugin executing the respective 
commands found at the related Debezium docker image file 


======================== PostgreSQL ========================
For the test we use a database named 'test
with a table named 'test_table', 
created with the following DDL SQL commands:

SQL commands
CREATE DATABASE test;


CREATE TABLE test_table
(
    id
char(10) NOT NULL,
    code        
char(10),
    PRIMARY KEY
(id)
);

We verify that the wal2json is working properly by obtaining the 'test_table' 
changes using the 'pg_recvlogical' postgresql utility.

* 'pg_recvlogical' commands
$ pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
$ pg_recvlogical
-d test --slot test_slot --start -o pretty-print=1 -f -

Indeed, on the following 'insert' and 'update' SQL commands, the 'wal2json'
outputs the table changes.

* SQL commands
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
INSERT
0 1
test
=# update test_table set code='code2' where id='id1';
UPDATE
1

* 'wal2json' output
bash-4.2$ pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
{
       
"change": [
               
{
                       
"kind": "insert",
                       
"schema": "public",
                       
"table": "test_table",
                       
"columnnames": ["id", "code"],
                       
"columntypes": ["character(10)", "character(10)"],
                       
"columnvalues": ["id1       ", "code1     "]
               
}
       
]
}
{
       
"change": [
               
{
                       
"kind": "update",
                       
"schema": "public",
                       
"table": "test_table",
                       
"columnnames": ["id", "code"],
                       
"columntypes": ["character(10)", "character(10)"],
                       
"columnvalues": ["id1       ", "code2     "],
                       
"oldkeys": {
                               
"keynames": ["id"],
                               
"keytypes": ["character(10)"],
                               
"keyvalues": ["id1       "]
                       
}
               
}
       
]
}



==================== Confluent connector ===================
We start confluent paltform and deploy the Debezium connector for the 'test
database with the following configuration:

* Debezium connector configuration file
{
 
"name": "db-test-connector",
 
"config": {
   
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
   
"tasks.max": "1",
   
"plugin.name": "wal2json",
   
"database.hostname": "localhost",
   
"database.port": "5432",
   
"database.user": "postgres",
   
"database.password": "password",
   
"database.dbname" : "test",
   
"database.server.name": "DB_TEST_SERVER"
 
}
}


We verify that the Debezium connector is started via the following confluent CLI command:

* Output of 'db-test-connector' connector status
$ ./bin/confluent status db-test-connector
{
 
"name": "db-test-connector",
 
"connector": {
   
"state": "RUNNING",
   
"worker_id": "127.0.0.1:8083"
 
},
 
"tasks": [
   
{
     
"state": "RUNNING",
     
"id": 0,
     
"worker_id": "127.0.0.1:8083"
   
}
 
],
 
"type": "source"
}


We verify that the Debezium connector makes the initial snapshot 
and inserts the messages of the database change events at the respective kafka topic 
(DB_TEST_SERVER.public.test_table) by the following commands: 

* Log segments dump for 'DB_TEST_SERVER.public.test_table' topic
$ ./bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/confluent.pMYr9Isc/kafka/data/DB_TEST_SERVER.public.test_table-0/00000000000000000000.log
Dumping /tmp/confluent.pMYr9Isc/kafka/data/DB_TEST_SERVER.public.test_table-0/00000000000000000000.log
Starting offset: 0
offset
: 0 position: 0 CreateTime: 1535713413019 isvalid: true keysize: 16 valuesize: 90 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: id1        payload: id1       code1     0.8.1.FinalDB_TEST_SERVER��Ƥ����q����r挸��Y
offset
: 1 position: 176 CreateTime: 1535713849765 isvalid: true keysize: 16 valuesize: 102 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: id1        payload: id1       id1       code2     0.8.1.FinalDB_TEST_SERVER��������*��q����u�����Y

(alternatively, the 'kafka-console-consumer' confluent consumer utility can be used)


Unfortunately, the message payload IS NOT at JSON format.
Please note that the key/value converter properties at the confluent connect 
configuration file (<confluent-5.0.0 path>/etc/kafka/connect-standalone.properties) 
are set to use the JSON converter.

* Confluent Connect configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value
.converter=org.apache.kafka.connect.json.JsonConverter



How can get the messages at JSON format ?


We will appreciate any help given.

Thanks in advance!



Jiri Pechanec

unread,
Aug 31, 2018, 10:03:02 AM8/31/18
to debezium
Hi,

the message is definitely Avro format so you are on the right track with an attempt to configure to covnerter. Does Confluent platform use Connect distributed or Connect standalone? If you look into Connect log, do you see the aforementioned properties set?

J.

John Psoroulas

unread,
Aug 31, 2018, 4:47:57 PM8/31/18
to debezium
Hi Jiri,

thanks for your prompt answer,
the connector platform uses the Connect distributed which also configured 
to use the JsonConverter at the file <confluent-5.0.0 path>/etc/kafka/connect-distributed.properties.

Unfortunately, after examining the logs, as you propose, the converter used for 'db-test-connector
is still the AvroConverter.

...
[2018-08-31 22:52:41,086] INFO Set up the key converter class io.confluent.connect.avro.AvroConverter for task db-test-connector-0 using the worker config
...

Hopefully, I finally manage to apply the JsonConverter at the Debezium connector via 
it's configuration file

* Debezium connector configuration file
{
 
"name": "db-test-connector",
 
"config": {
   
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
   
"tasks.max": "1",
   
"plugin.name": "wal2json",
   
"database.hostname": "localhost",
   
"database.port": "5432",
   
"database.user": "postgres",
   
"database.password": "password",
   
"database.dbname" : "test",

   
"database.server.name": "DB_TEST_SERVER",
   
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
   
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
,
   
"key.converter.schemas.enable":"false",
   
"value.converter.schemas.enable": "false"
 
}
}

The messages are formatted in JSON as expected. Here are the payloads for the aforementioned 
'insert' and 'update' SQL statements

* Payload for 'insert' statement
{
 
"before": null,
 
"after": {
   
"id": "id1       ",
   
"code": "code1     "
 
},
 
"source": {
   
"version": "0.8.1.Final",
   
"name": "DB_TEST_SERVER",
   
"ts_usec": 1535746798653370000,
   
"txId": 359855,
   
"lsn": 550089644,
   
"snapshot": false,
   
"last_snapshot_record": null
 
},
 
"op": "c",
 
"ts_ms": 1535746798689
}


* Payload for 'update' statement
{
 
"before": {
   
"id": "id1       ",
   
"code": null
 
},
 
"after": {
   
"id": "id1       ",
   
"code": "code2     "
 
},
 
"source": {
   
"version": "0.8.1.Final",
   
"name": "DB_TEST_SERVER",
   
"ts_usec": 1535746808878357000,
   
"txId": 359856,
   
"lsn": 550090038,
   
"snapshot": false,
   
"last_snapshot_record": null
 
},
 
"op": "u",
 
"ts_ms": 1535746808898
}


thanks again!

John
Reply all
Reply to author
Forward
0 new messages