Hello,
We have setup Debezium on gcp in combination with gcp cloud Postgres SQL DB and Kafka servers also on gcp. And the DB has configured the following flags:
cloudsql.logical_decoding ON
cloudsql.enable_pglogical ON
And Debezium manage to read all the events from the DB just fine. And also to read and write to the Kafka support topics (configs, offets, status).
But there are two issues I face:
Issue #1. Not possible to read event data from replica.
When configuring a replica, the replica is in permanent recovery mode not working well with Debezium. So how to overcome this problem? One do not want to put load on the master DBs. The alternative is at regular invervals to copy the master DB to a "copy" DB and read from that instead, but then the events will no longer be in reatime.
Issue #2. Not possible to support non-Avro schemas(?)
I'm not sure if I have configured everything correctly, and I don't have permission to check our Kafka logs to check out what is happening on that side. However regardless of if I configure Debezium to use string (see below) or JSON I always see this in the log files (references to Avro schemas), not sure if this is the root cause or not:
The Kafka Connect schema name 'products-db-events-dev.Value' is not a valid Avro schema name,
The Kafka Connect schema name 'products-db-events-dev.Envelope' is not a valid Avro schema name,
The Kafka Connect schema name 'products-db-events-dev.Key' is not a valid Avro schema name
I understand that products-db-events-dev is converted to products_db_events_dev which is just fine. This is an example of the configs I use in order for starting Debezium and for the connector (I know there is some overlap here though it shouldn't have any negative impact). Since I currently don't have configured Debezium to use Avro, only the topic for handling the events i.e. products-db-events-dev is created on the Kafka side (and not e.g. products_db_events_dev.Value).
STARTUP GKE yaml config file
------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
app: debezium-kafka-connect-xxxx
name: debezium-kafka-connect-xxxx
spec:
replicas: 1
selector:
matchLabels:
app: debezium-kafka-connect-xxxx
strategy: {}
template:
metadata:
creationTimestamp: null
labels:
app: debezium-kafka-connect-xxxx
spec:
containers:
- image: .....debezium@xxxxxxxssss
name: ddddddd5
ports:
- name: debeziumrest
containerPort: 8083
env:
- name: LOG_LEVEL
value: "INFO"
- name: GROUP_ID
value: "products-consumer-dev"
- name: HOST_NAME
value: 0.0.0.0
- name: CONFIG_STORAGE_TOPIC
value: "products-connect-configs-dev"
- name: OFFSET_STORAGE_TOPIC
value: "products-connect-offsets-dev"
- name: STATUS_STORAGE_TOPIC
value: "products-connect-status-dev"
- name: BOOTSTRAP_SERVERS
value: "bootstrap.testdddddd:9095"
- name: CONNECT_SECURITY_PROTOCOL
value: "SASL_SSL"
- name: CONNECT_TOPIC
value: "products-db-events-dev"
- name: CONNECT_SASL_MECHANISM
value: "xxxxxxx"
- name: CONNECT_SASL_JAAS_CONFIG
value: "org.apache.kafka.common.security.scram.ScramLoginModule required username=yyyyyy password=xxxx;"
- name: CONNECT_CLIENT_CONFIG_OVERRIDE_POLICY_ALL
value: "All"
- name: CONNECT_CONNECTOR_CLASS
value: "io.debezium.connector.postgresql.PostgresConnector"
- name: CONNECT_ERRORS_LOG_INCLUDE_MESSAGES
value: "true"
- name: CONNECT_DATABASE_USER
value: "bbbbbb"
- name: CONNECT_DATABASE_DBNAME
value: "aaaaaa"
- name: CONNECT_TRANSFORM_REROUTE_TYPE
value: "io.debezium.transforms.ByLogicalTableRouter"
- name: CONNECT_SANITIZE_FIELD_NAMES
value: "false"
- name: CONNECT_SLOT_NAME
value: "products"
- name: CONNECT_TASKS_MAX
value: "1"
- name: CONNECT_TRANSFORMS
value: "Reroute"
- name: CONNECT_DATABASE_SERVER_NAME
value: "prkkkkk"
- name: CONNECT_TRANSFORMS_REROUTE_TOPIC_REGEX
value: "(.*)"
- name: CONNECT_PLUGIN_NAME
value: "wal2json"
- name: CONNECT_DATABASE_PORT
value: "5432"
- name: CONNECT_TOMBSTONES_ON_DELETE
value: "false"
- name: CONNECT_DATABASE_SSLMODE
value: "disable"
- name: CONNECT_TRANSFORMS_REROUTE_TOPIC_REPLACEMENT
value: "products-db-events-local"
- name: CONNECT_TABLE_INCLUDE_LIST
value: "public.distance_matrix_element"
- name: CONNECT_DATABASE_HOSTNAME
value: "3xxx.yyy.xxx"
- name: CONNECT_DATABASE_PASSWORD
value: "xxxx"
- name: CONNECT_VALUE_CONVERTER
value: "org.apache.kafka.connect.storage.StringConverter"
- name: CONNECT_ERRORS_LOG_ENABLE
value: "true"
- name: CONNECT_KEY_CONVERTER
value: "org.apache.kafka.connect.storage.StringConverter"
- name: CONNECT_SNAPSHOT_MODE
value: "initial"
- name: KEY_CONVERTER_SCHEMAS_ENABLE
value: "false"
- name: VALUE_CONVERTER_SCHEMAS_ENABLE
value: "false"
resources: {}
- name: cloudsql-proxy
image: gcr.io/cloudsql-docker/gce-proxy:1.16
command: ["/cloud_sql_proxy", "--dir=/cloudsql",
"-instances=entur-1287:europe-west1:proxxxx=tcp:5432",
"-credential_file=/sabbbbb/wwwwwww/proxxxx.json"]
volumeMounts:
- name: xxxx
mountPath: xxxx
readOnly: true
- name: ssl-certs
mountPath: /etc/ssl/certs
- name: cloudsql
mountPath: /cloudsql
resources:
limits:
cpu: 100m
memory: 20Mi
requests:
cpu: 50m
memory: 10Mi
# [END proxy_container]
# [START volumes]
volumes:
- name: xxxx
secret:
secretName: xxxx
- name: ssl-certs
hostPath:
path: /etc/ssl/certs
- name: cloudsql
emptyDir:
- name: oidc-auth-volume
configMap:
name: oidc-auth-config
- name: adc-token
secret:
secretName: xxxx
# [END volumes]
status: {}
CONNECTOR --------------------
{
"name": "debezium-connector-20",
"config": {
"security.protocol": "Abbbbb",
"sasl.mechanism": "eeeeeeeee",
"sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=yyyy password=xxxx;",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"errors.log.include.messages": "true",
"database.user": "aaaa",
"database.dbname": "xxxxx",
"sanitize.field.names": "false",
"slot.name": "products",
"tasks.max": "1",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*)",
"transforms.Reroute.topic.replacement": "products-db-events-dev",
"table.include.list": "public.distance_matrix_element",
"database.server.name": "wwwwwww",
"plugin.name": "wal2json",
"database.port": "5432",
"tombstones.on.delete": "false",
"database.sslmode": "disable",
"database.hostname": "127.0.0.1",
"database.password": "aaaaaa",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snapshot.mode": "initial",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
And here follows an example of data Debezium mange to read from the master DB
"....ConnectRecord{topic='products.public.distance_matrix_element',
kafkaPartition=null,
key=Struct{id=12388},
keySchema=Schema{products.public.distance_matrix_element.Key:STRUCT},
value=Struct{
after=Struct{id=12388,
name_of_class=DistanceMatrixElement,
start_scheduled_stop_point_netex_id=NSR:StopPlace:218,
start_scheduled_stop_point_version=ENT:Version:V1,
end_scheduled_stop_point_netex_id=NSR:StopPlace:288,
end_scheduled_stop_point_version=ENT:Version:V1,
distance=Struct{scale=0,value=[B@xxxxxx},group_id=1}, source=Struct{version=1.7.1.Final,connector=postgresql,name=products,ts_ms=1639651494063,snapshot=true,db=products,sequence=[null,"5181237129616"],schema=public,table=distance_matrix_element,txId=70769556,ln=5181237129616}, op=r,ts_ms=1639651494063}, valueSchema=Schema{products.public.distance_matrix_element.Envelope:STRUCT},
timestamp=null,
headers=ConnectHeaders(headers=}....."
Hope for some feedback and help :-)
Best regards,
Kevin