/**
* Transforms a Pub/Sub message by renaming the 'ts_ns' field and
* adding a hardcoded '_CHANGE_TYPE' field.
*
* @param {(Object<string, (string | Object<string, string>)>} message - The incoming
* Pub/Sub message. The 'data' field is a plain UTF-8 string.
*
* @param {Object<string, any>} metadata - Pub/Sub message metadata.
*
* @return {(Object<string, (string | Object<string, string>)>|null)} - The transformed
* message object, or null to filter the message.
*/
function renameTimestampAndAddChangeType(message, metadata) {
try {
// 1. Parse the JSON string from the message data.
const payload = JSON.parse(message.data);
// 2. Rename 'ts_ns' to 'CHANGE_SEQUENCE_NUMBER'.
// We check if the property exists before trying to rename it.
if (payload.hasOwnProperty('ts_ns')) {
payload.CHANGE_SEQUENCE_NUMBER = payload.ts_ns;
delete payload.ts_ns;
}
// 3. Add the hardcoded '_CHANGE_TYPE' field with the value 'UPSERT'.
payload._CHANGE_TYPE = 'UPSERT';
// 4. Update the 'data' property of the original message object
// with the stringified version of the modified payload.
message.data = JSON.stringify(payload);
// 5. Return the modified message object.
return message;
} catch (error) {
// If an error occurs (e.g., malformed JSON), log it and drop the message.
console.error('Error transforming message:', error);
return null;
}
}