Re: How to integrate MySQL to BigQuery CDC tables with Pub/Sub and Debezium

30 views
Skip to first unread message

Sehwee Kim

unread,
Sep 16, 2025, 4:46:12 AMSep 16
to debe...@googlegroups.com, Cloud Pub/Sub Users, cloud-pu...@google.com
+ resending to @debe...@googlegroups.com



On Tue, Sep 16, 2025 at 4:16 PM Sehwee Kim <seh...@google.com> wrote:
Hi team,
I am looking for referable material on integrating MySQL to Debezium to Pub/Sub to BigQuery CDC tables using BigQuery Subscriptions.

I found the following related resources, but they do not contain detailed information on integrating Debezium to Pub/Sub with BigQuery CDC tables.
For example, I need to know how to transform the `__op` field from Debezium into `_CHANGE_TYPE` for BigQuery CDC tables when using use_topic_schema.

Any guidance or additional resources you can provide would be greatly appreciated.

Regards,
Sehwee

                                                                                              

Sehwee Kim (김세휘)
Customer Engineer, Data Analytics, Google Cloud
seh...@google.com 
+82-2-531-9961

Nathan Smit

unread,
Sep 16, 2025, 6:03:59 AMSep 16
to debezium
Hey there, are you planning to use Debezium + Kafka or Debezium Server or are you still researching?

We currently ingest data into bq with Debezium Server using the Mysql source and the Pubsub sink.  We don't use the BQ CDC tables unfortunately.  The way we do this is we flatten the messages with ExtractNewRecordState (which you seem to be doing as well) and then send to pubsub with a bigquery subscription and use the big query table schema.  We then implement our own procedures to do the merge.

For the BQ CDC tables you'll need to map the __op field to either UPSERT (c,r,u) or DELETE (d).  Unfortunately as you've probably seen there isn't a simple way to do this as far as I'm aware (although I'm sure the DBZ maintainers can weigh in).  I believe the only way to do this would be to write your own custom SMT.  If you haven't done this before there is a tutorial here:  https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/

I wonder though if another option would be to use the newly added Pubsub SMTs?  I'm not sure what functionality has been added there, but perhaps this could work for the transform you need:  https://cloud.google.com/blog/products/data-analytics/pub-sub-single-message-transforms

Just aliasing the column names (without needing to inspect and transform them) is pretty easy if you use ExtractNewRecordState.  For example, you could do:

transforms.unwrap.add.fields=op:CHANGE_TYPE,source.ts_ns:_CHANGE_SEQUENCE_NUMBER

And then set add.fields.prefix to be just a single underscore.  So you'd then have the op named as the correct psuedo-column for pubsub and then can use a really granular timestamp like the source nanoseconds for the change sequence number.  Then the only remaining challenge is mapping the more detailed OP to the CHANGE_TYPE.

Also just FYI your google doc you linked is not publicly accessible.

Sehwee Kim

unread,
Sep 17, 2025, 8:08:43 PMSep 17
to debe...@googlegroups.com, Mehran Nazir, Junho Jang
Hi Nathan,
Thank you for your email and valuable insights.

I plan to use Debezium Server directly to Pub/Sub, similar to your approach. Could you share what products or features you utilize for the merging stage?

Regarding your suggestion to alias the column name, I encountered an issue where `CHANGE_TYPE` was automatically prefixed with a double underscore. I will try setting `add.fields.prefix`, which I overlooked previously.

Thank you again for sharing your experience. I will keep you updated on the testing progress.

FYI, I have updated the reference links to publicly accessible versions:

    Regards,
    Sehwee


                                                                                                  

    Sehwee Kim (김세휘)
    Customer Engineer, Data Analytics, Google Cloud
    seh...@google.com 
    +82-2-531-9961


    This email and all contents are subject to the following disclaimer: https://www.pepkor.co.za/email-disclaimer

    IMPORTANT: This electronic message may contain information that is considered confidential. If received in error, please notify the sender immediately and then delete the original and all subsequent/related messages.

    --
    You received this message because you are subscribed to the Google Groups "debezium" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
    To view this discussion visit https://groups.google.com/d/msgid/debezium/aaaa0fa8-92cd-4117-84b0-a70cfddedd1en%40googlegroups.com.

    Nathan Smit

    unread,
    Oct 13, 2025, 6:36:02 PM (9 days ago) Oct 13
    to debezium
    Hey there,

    Sorry for very late response on this.  The way I currently handle things is you have DBZ Server --> Pubsub Topic -->  Bigquery Subscription to BQ Table

    So all the raw events are stored to let's say dataset.all_events.  Then every day you could run a scheduled job using e.g. composer to merge that data into dataset.deduped_events.  Then if someone wants to see the most up to date version of the data you can do a union between all_events and deduped_events (where you filter to only recent values in all_events).

    I was actually just messing around with SMTs in Pubsub (which reminded me about your post) as I have a slightly different use-case and the new Pubsub SMTs seem to work pretty well for this also.  So I added the below transform to my bigquery to pubsub subscription and then you can pass a payload like the below and still have BQ CDC process it

    {
      "id": 123,
      "first_name": "Jane",
      "last_name": "doe",
      "email": "notanemail",
      "is_active": true,
      "ts_ns": 1760336603123456789
    }


    /**
     * Transforms a Pub/Sub message by renaming the 'ts_ns' field and
     * adding a hardcoded '_CHANGE_TYPE' field.
     *
     * @param {(Object<string, (string | Object<string, string>)>} message - The incoming
     * Pub/Sub message. The 'data' field is a plain UTF-8 string.
     *
     * @param {Object<string, any>} metadata - Pub/Sub message metadata.
     *
     * @return {(Object<string, (string | Object<string, string>)>|null)} - The transformed
     * message object, or null to filter the message.
     */
    function renameTimestampAndAddChangeType(message, metadata) {
      try {
        // 1. Parse the JSON string from the message data.
        const payload = JSON.parse(message.data);

        // 2. Rename 'ts_ns' to 'CHANGE_SEQUENCE_NUMBER'.
        // We check if the property exists before trying to rename it.
        if (payload.hasOwnProperty('ts_ns')) {
          payload.CHANGE_SEQUENCE_NUMBER = payload.ts_ns;
          delete payload.ts_ns;
        }

        // 3. Add the hardcoded '_CHANGE_TYPE' field with the value 'UPSERT'.
        payload._CHANGE_TYPE = 'UPSERT';

        // 4. Update the 'data' property of the original message object
        // with the stringified version of the modified payload.
        message.data = JSON.stringify(payload);

        // 5. Return the modified message object.
        return message;

      } catch (error) {
        // If an error occurs (e.g., malformed JSON), log it and drop the message.
        console.error('Error transforming message:', error);
        return null;
      }
    }
    Reply all
    Reply to author
    Forward
    0 new messages