Hi,
we test the Debezium connector for postgreSQL,
all seems to work fine except for the kafka message format that is not JSON as expected.
Here is our test procedure:
Following the instructions described at
we install the Debezium connector (v0.8.1.Final)
for PostgreSQL (v9.6.10)
as a connector of confluent platform (v5.0.0)
on a CentOS Linux 7 (kernel: Linux 3.10.0-862.11.6.el7.x86_64).
logical decoding. We compile and install the plugin executing the respective
commands found at the related Debezium docker image file
======================== PostgreSQL ========================
For the test we use a database named 'test'
with a table named 'test_table',
created with the following DDL SQL commands:
* SQL commands
CREATE DATABASE test;
CREATE TABLE test_table (
id char(10) NOT NULL,
code char(10),
PRIMARY KEY (id)
);
We verify that the wal2json is working properly by obtaining the 'test_table'
changes using the 'pg_recvlogical' postgresql utility.
* 'pg_recvlogical' commands
$ pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
$ pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
Indeed, on the following 'insert' and 'update' SQL commands, the 'wal2json'
outputs the table changes.
* SQL commands
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
INSERT 0 1
test=# update test_table set code='code2' where id='id1';
UPDATE 1
* 'wal2json' output
bash-4.2$ pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code1 "]
}
]
}
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code2 "],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["id1 "]
}
}
]
}
==================== Confluent connector ===================
We start confluent paltform and deploy the Debezium connector for the 'test'
database with the following configuration:
* Debezium connector configuration file
{
"name": "db-test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "wal2json",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname" : "test",
"database.server.name": "DB_TEST_SERVER"
}
}
We verify that the Debezium connector is started via the following confluent CLI command:
* Output of 'db-test-connector' connector status
$ ./bin/confluent status db-test-connector
{
"name": "db-test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
We verify that the Debezium connector makes the initial snapshot
and inserts the messages of the database change events at the respective kafka topic
(DB_TEST_SERVER.public.test_table) by the following commands:
* Log segments dump for 'DB_TEST_SERVER.public.test_table' topic
$ ./bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/confluent.pMYr9Isc/kafka/data/DB_TEST_SERVER.public.test_table-0/00000000000000000000.log
Dumping /tmp/confluent.pMYr9Isc/kafka/data/DB_TEST_SERVER.public.test_table-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1535713413019 isvalid: true keysize: 16 valuesize: 90 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: id1 payload: id1 code1 0.8.1.FinalDB_TEST_SERVER��Ƥ����q����r挸��Y
offset: 1 position: 176 CreateTime: 1535713849765 isvalid: true keysize: 16 valuesize: 102 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: id1 payload: id1 id1 code2 0.8.1.FinalDB_TEST_SERVER��������*��q����u�����Y
(alternatively, the 'kafka-console-consumer' confluent consumer utility can be used)
Unfortunately, the message payload IS NOT at JSON format.
Please note that the key/value converter properties at the confluent connect
configuration file (<confluent-5.0.0 path>/etc/kafka/connect-standalone.properties)
are set to use the JSON converter.
* Confluent Connect configuration
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
How can get the messages at JSON format ?
We will appreciate any help given.
Thanks in advance!