Issues with gcp cloud sql/replica, Debezium and non Avro schemas

331 views
Skip to first unread message

Kevin Kliland

unread,
Dec 29, 2021, 4:14:26 AM12/29/21
to debezium
Hello,

We have setup Debezium on gcp in combination with gcp cloud Postgres SQL DB and Kafka servers also on gcp. And the DB has configured the following flags:

cloudsql.logical_decoding ON
cloudsql.enable_pglogical ON

And Debezium manage to read all the events from the DB just fine. And also to read and write to the Kafka support topics (configs, offets, status).

But there are two issues I face:

Issue #1. Not possible to read event data from replica.
When configuring a replica, the replica is in permanent recovery mode not working well with Debezium. So how to overcome this problem? One do not want to put load on the master DBs. The alternative is at regular invervals to copy the master DB to a "copy" DB and read from that instead, but then the events will no longer be in reatime.

Issue #2. Not possible to support non-Avro schemas(?)
I'm not sure if I have configured everything correctly, and I don't have permission to check our Kafka logs to check out what is happening on that side. However regardless of if I configure Debezium to use string (see below) or JSON I always see this in the log files (references to Avro schemas), not sure if this is the root cause or not:

The Kafka Connect schema name 'products-db-events-dev.Value' is not a valid Avro schema name, The Kafka Connect schema name 'products-db-events-dev.Envelope' is not a valid Avro schema name,
The Kafka Connect schema name 'products-db-events-dev.Key' is not a valid Avro schema name

I understand that products-db-events-dev is converted to products_db_events_dev which is just fine. This is an example of the configs I use in order for starting Debezium and for the connector (I know there is some overlap here though it shouldn't have any negative impact). Since I currently don't have configured Debezium to use Avro, only the topic for handling the events i.e. products-db-events-dev is created on the Kafka side (and not e.g. products_db_events_dev.Value).

STARTUP GKE yaml config file
------------------------------------------------
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: debezium-kafka-connect-xxxx
  name: debezium-kafka-connect-xxxx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: debezium-kafka-connect-xxxx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: debezium-kafka-connect-xxxx
    spec:
      containers:
      - image: .....debezium@xxxxxxxssss
        name: ddddddd5
        ports:
        - name: debeziumrest
          containerPort: 8083
        env:
        - name: LOG_LEVEL
          value: "INFO"
        - name: GROUP_ID
          value: "products-consumer-dev"
        - name: HOST_NAME
          value: 0.0.0.0
        - name: CONFIG_STORAGE_TOPIC
          value: "products-connect-configs-dev"
        - name: OFFSET_STORAGE_TOPIC
          value: "products-connect-offsets-dev"
        - name: STATUS_STORAGE_TOPIC
          value: "products-connect-status-dev"
        - name: BOOTSTRAP_SERVERS
          value: "bootstrap.testdddddd:9095"
        - name: CONNECT_SECURITY_PROTOCOL
          value: "SASL_SSL"
        - name: CONNECT_TOPIC
          value: "products-db-events-dev"
        - name: CONNECT_SASL_MECHANISM
          value: "xxxxxxx"
        - name: CONNECT_SASL_JAAS_CONFIG
          value: "org.apache.kafka.common.security.scram.ScramLoginModule required username=yyyyyy password=xxxx;"
        - name: CONNECT_CLIENT_CONFIG_OVERRIDE_POLICY_ALL
          value: "All"
        - name: CONNECT_CONNECTOR_CLASS
          value: "io.debezium.connector.postgresql.PostgresConnector"
        - name: CONNECT_ERRORS_LOG_INCLUDE_MESSAGES
          value: "true"
        - name: CONNECT_DATABASE_USER
          value: "bbbbbb"
        - name: CONNECT_DATABASE_DBNAME
          value: "aaaaaa"
        - name: CONNECT_TRANSFORM_REROUTE_TYPE
          value: "io.debezium.transforms.ByLogicalTableRouter"
        - name: CONNECT_SANITIZE_FIELD_NAMES
          value: "false"
        - name: CONNECT_SLOT_NAME
          value: "products"
        - name: CONNECT_TASKS_MAX
          value: "1"
        - name: CONNECT_TRANSFORMS
          value: "Reroute"
        - name: CONNECT_DATABASE_SERVER_NAME
          value: "prkkkkk"
        - name: CONNECT_TRANSFORMS_REROUTE_TOPIC_REGEX
          value: "(.*)"
        - name: CONNECT_PLUGIN_NAME
          value: "wal2json"
        - name: CONNECT_DATABASE_PORT
          value: "5432"
        - name: CONNECT_TOMBSTONES_ON_DELETE
          value: "false"
        - name: CONNECT_DATABASE_SSLMODE
          value: "disable"
        - name: CONNECT_TRANSFORMS_REROUTE_TOPIC_REPLACEMENT
          value: "products-db-events-local"
        - name: CONNECT_TABLE_INCLUDE_LIST
          value: "public.distance_matrix_element"
        - name: CONNECT_DATABASE_HOSTNAME
          value: "3xxx.yyy.xxx"
        - name: CONNECT_DATABASE_PASSWORD
          value: "xxxx"
        - name: CONNECT_VALUE_CONVERTER
          value: "org.apache.kafka.connect.storage.StringConverter"
        - name: CONNECT_ERRORS_LOG_ENABLE
          value: "true"
        - name: CONNECT_KEY_CONVERTER
          value: "org.apache.kafka.connect.storage.StringConverter"
        - name: CONNECT_SNAPSHOT_MODE
          value: "initial"
        - name: KEY_CONVERTER_SCHEMAS_ENABLE
          value: "false"
        - name: VALUE_CONVERTER_SCHEMAS_ENABLE
          value: "false"
        resources: {}
      - name: cloudsql-proxy
        image: gcr.io/cloudsql-docker/gce-proxy:1.16
        command: ["/cloud_sql_proxy", "--dir=/cloudsql",
                  "-instances=entur-1287:europe-west1:proxxxx=tcp:5432",
                  "-credential_file=/sabbbbb/wwwwwww/proxxxx.json"]
        volumeMounts:
          - name: xxxx
            mountPath: xxxx
            readOnly: true
          - name: ssl-certs
            mountPath: /etc/ssl/certs
          - name: cloudsql
            mountPath: /cloudsql
        resources:
          limits:
            cpu: 100m
            memory: 20Mi
          requests:
            cpu: 50m
            memory: 10Mi
          # [END proxy_container]
      # [START volumes]
      volumes:
      - name: xxxx
        secret:
          secretName: xxxx
      - name: ssl-certs
        hostPath:
          path: /etc/ssl/certs
      - name: cloudsql
        emptyDir:
      - name: oidc-auth-volume
        configMap:
          name: oidc-auth-config
      - name: adc-token
        secret:
          secretName: xxxx
      # [END volumes]
status: {}

CONNECTOR
--------------------
{
    "name": "debezium-connector-20",
    "config": {
        "security.protocol": "Abbbbb",
        "sasl.mechanism": "eeeeeeeee",
        "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=yyyy password=xxxx;",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "errors.log.include.messages": "true",
        "database.user": "aaaa",
        "database.dbname": "xxxxx",
        "sanitize.field.names": "false",
        "slot.name": "products",
        "tasks.max": "1",
        "transforms": "Reroute",
        "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute.topic.regex": "(.*)",
        "transforms.Reroute.topic.replacement": "products-db-events-dev",
        "table.include.list": "public.distance_matrix_element",
        "database.server.name": "wwwwwww",
        "plugin.name": "wal2json",
        "database.port": "5432",
        "tombstones.on.delete": "false",
        "database.sslmode": "disable",
        "database.hostname": "127.0.0.1",
        "database.password": "aaaaaa",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "errors.log.enable": "true",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "snapshot.mode": "initial",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }
}

And here follows an example of data Debezium mange to read from the master DB

"....ConnectRecord{topic='products.public.distance_matrix_element',
              kafkaPartition=null,
              key=Struct{id=12388},
              keySchema=Schema{products.public.distance_matrix_element.Key:STRUCT},
              value=Struct{
                          after=Struct{id=12388,
                                       name_of_class=DistanceMatrixElement,
                                       start_scheduled_stop_point_netex_id=NSR:StopPlace:218,
                                       start_scheduled_stop_point_version=ENT:Version:V1,
                                       end_scheduled_stop_point_netex_id=NSR:StopPlace:288,
                                       end_scheduled_stop_point_version=ENT:Version:V1,
                                       distance=Struct{scale=0,value=[B@xxxxxx},group_id=1},          source=Struct{version=1.7.1.Final,connector=postgresql,name=products,ts_ms=1639651494063,snapshot=true,db=products,sequence=[null,"5181237129616"],schema=public,table=distance_matrix_element,txId=70769556,ln=5181237129616}, op=r,ts_ms=1639651494063},                   valueSchema=Schema{products.public.distance_matrix_element.Envelope:STRUCT},
                         timestamp=null,
                         headers=ConnectHeaders(headers=}....."

Hope for some feedback and help :-)

Best regards,

Kevin

jiri.p...@gmail.com

unread,
Jan 4, 2022, 1:13:03 AM1/4/22
to debezium
Hi,

first question - PostgreSQL does not support replication from slave so it is technically impossible for Debezium to capture changes from replica
second question - the normalization of schema names is perfromed regardles of using Avro converter or not - this is intenitonal. In your example evertyhing is correct  -StringConverter is applied. But I think you should use JsonConverter instead.

J.

Reply all
Reply to author
Forward
0 new messages