Debezium Signal topic for AWS MySQL read replica

55 views
Skip to first unread message

Jorge Aguirre

unread,
Jun 9, 2022, 9:04:22 AM6/9/22
to debezium
Hi all,

As per this blog, we should be able to create incremental snapshots for new tables added to "table.include.list" parameter in a MySQL read replica (we in AWS) by producing messages to a signal topic. We tested it with several configurations but without any positive result.

Our Debezium version is 1.8
gtid_mode and enforce_gtid_consistency to ON
Signal topic with 1 partition and delete cleanup.policy

We can see in the cloudwatch logs that the connector is subscribed to the signal topic, but it always says "no incremental snapshot in progress" and when we produce to the signal topic (key = database.server.name), as stated in the blog, no snapshot is started.

Have any you tested this scenario?

Kind regards,

Chris Cranford

unread,
Jun 9, 2022, 12:51:37 PM6/9/22
to debe...@googlegroups.com, Jorge Aguirre
Hi Jorge -

Can you share your connector configuration as well as the contents of the incremental snapshot payload you insert into the signal table?

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/6afcc71c-5bc2-46dc-b8b6-1c6e8af9a28dn%40googlegroups.com.

Jorge Aguirre

unread,
Jun 10, 2022, 6:21:29 AM6/10/22
to debezium
Hi Chris,

Just to clarify, we are not using the signal table but the signal topic approach.

I hope I've changed anything risky.

  {
    "name": "MySQL",  
    "config":
    {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "topic.creation.default.partitions": "3",
        "topic.creation.default.replication.factor": "3",
        "topic.creation.default.cleanup.policy": "delete",
        "topic.creation.default.retention.ms": "94670856000",
        "tasks.max": "1",
        "database.history.kafka.topic": "<server_name>_SCHEMA_CHANGES",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "snapshot.fetch.size": "10000",
        "delete.handling.mode": "rewrite",
        "decimal.handling.mode": "double",
        "database.history.skip.unparseable.ddl": "true",
        "database.history.store.only.captured.tables.ddl": "true",
        "errors.retry.timeout": "300000",
        "database.hostname": "${aws:<secret_name>:database-host-name}",
        "database.port": "${aws:<secret_name>:database-port}",
        "database.user": "${aws:<secret_name>:database-user}",
        "database.password": "${aws:<secret_name>:database-password}",
        "database.server.id": "<server_id>",
        "database.server.name": "<server_name>",
        "database.include.list": "rcs_ua",
        "database.history.kafka.bootstrap.servers": "${aws:<secret_name>:kafka-brokers}",
        "value.converter.schema.registry.url": "http://schema-registry.kafka:8081",
        "key.converter.schema.registry.url": "http://schema-registry.kafka:8081",
       
        "table.include.list": "rcs_ua.material,rcs_ua.orders",

        "snapshot.mode": "when_needed",
        "signal.kafka.topic": "dataplatform-roots-dbz-signal",
        "signal.kafka.bootstrap.servers": "${aws:<secret_name>:kafka-brokers}",
        "read.only": "true",

        "transforms": "unwrap,RemoveString",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode" : "rewrite",
        "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
        "transforms.RemoveString.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.RemoveString.regex": "(.*)_ua(.*)",
        "transforms.RemoveString.replacement": "$1$2",

        "database.history.consumer.security.protocol": "SASL_SSL",
        "database.history.consumer.sasl.mechanism": "SCRAM-SHA-512",
        "database.history.consumer.sasl.jaas.config": "${aws:<secret_name>:jaas-auth}",

        "database.history.producer.security.protocol": "SASL_SSL",
        "database.history.producer.sasl.mechanism": "SCRAM-SHA-512",
        "database.history.producer.sasl.jaas.config": "${aws:<secret_name>:jaas-auth}"
    }
  }

then we add a new table, rcs_ua.deliveries to  "table.include.list" parameter
and produce to topic
<server_name>;{"type":"execute-snapshot","data": {"data-collections": ["rcs_ua.deliveries"], "type": "INCREMENTAL"}}

regards

Jorge Aguirre

unread,
Jun 14, 2022, 5:36:51 AM6/14/22
to debezium
Hi Chris,

Any idea about the possible error?

Regards,

Chris Cranford

unread,
Jun 14, 2022, 9:23:02 AM6/14/22
to debe...@googlegroups.com, Jorge Aguirre
Hi Jorge -

So I would first check the connector offsets, specifically looking at "_signal_offset" and see whether it has a value or exists.  If it does, that tells us the offset in the signal topic we expect to read from.  If you had deleted the signal topic but didn't reset this value in the offsets, there could be an inconsistency.  If you are comfortable manipulating the offsets, you could try resetting the signal topic state by stopping the connector, delete the signal topic & re-create it and remove this entry from the offsets.  This should set the offset to 0 and you could try re-sending a signal again.

If that doesn't work, I would enable TRACE logging for the connector and restart.  With TRACE enabled, send a signal to the topic.  What we want to look for are the following loggable entries:

    [INFO] "Signal key 'xxxxx' doesn't match the connector's name 'yyyyyy'"
    [TRACE] "Processing signal: xxxxx"
    [WARN] "Unknown signal type xxxxxx"
    [INFO] "Requested 'xxxxxxx' snapshot of data collections: 'yyyyyyy'"

I'm guessing since you've looked at the logs you likely haven't seen the first or last two entries, so hopefully we can see whether the signal thread actually gets any data.  I don't see anything wrong with the data you've sent to the topic, so hopefully the TRACE output can shed some light on whether we get the entry and what its contents happens to be.

Thanks,
Chris

Chris Cranford

unread,
Jun 14, 2022, 9:24:09 AM6/14/22
to debe...@googlegroups.com, Jorge Aguirre
Hi Jorge -

One small correct, the offset field is called "incremental_snapshot_signal_offset", I missed the prefix.  Sorry for the confusion.

Chris

Haswin Vidanage

unread,
Mar 17, 2023, 10:23:25 AM3/17/23
to debezium
Hi Jorge,

Did you manage to find a solution to this issue ? I'm facing the same issue with Debezium version is 1.9.

Regards,
Haswin.
Reply all
Reply to author
Forward
0 new messages