How to do the transformations from source DB and store the result in the destination DB

7 views
Skip to first unread message

Jamuna Kannan

unread,
Jun 24, 2024, 11:40:10 AM (9 days ago) Jun 24
to debezium
Hi,

I have source DB and destination(sink) DB connected using kconnect, debezium, zookeeper.
I am trying to do the transformations for a particular column in the source DB and add the result to a new column in destination DB.

For doing this I have tried creating KSQL streams to manipulate the data in the Kafka-topic. But I am unable to sync the result with the destination database.


Step1:
Since kafka topic('Persons') has schema and payload creating a raw stream like below:
CREATE STREAM RawPersons (
  schema STRUCT<
    type VARCHAR,
    fields ARRAY<STRUCT<
      type VARCHAR,
      optional BOOLEAN,
      field VARCHAR
    >>,
    optional BOOLEAN,
    name VARCHAR
  >,
  payload STRUCT<
    ID INT,
    LastName VARCHAR,
    FirstName VARCHAR,
    Age INT
  >
) WITH (
  KAFKA_TOPIC='Persons',
  VALUE_FORMAT='JSON'
);

KAFKA_TOPIC = Persons
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"LastName"},{"type":"string","optional":true,"field":"FirstName"},{"type":"int32","optional":true,"field":"Age"}],"optional":false,"name":"azure.SourceDebe.dbo.Persons.Value"},"payload":{"ID":1,"LastName":"Ram","FirstName":"Raj","Age":30}}


--
Step2:
Here creating another column Result with just a simple calculation(Result = Age+1)
CREATE STREAM PersonsJson AS
SELECT
  payload->ID AS ID,
  payload->LastName AS LastName,
  payload->FirstName AS FirstName,
  payload->Age AS Age,
  payload->Age + 1 AS Result
FROM
  RawPersons;
 
KAFKA_TOPIC = PersonsJson
{"ID":1,"LASTNAME":"Ram","FIRSTNAME":"Raj","AGE":90, "RESULT":91}


I am not able to sync this record to the destination database. Can anyone pls help on this?
Any help on this is appriciated. Thanks in advance.
Reply all
Reply to author
Forward
0 new messages