Hi,
I have source DB and destination(sink) DB connected using kconnect, debezium, zookeeper.
I am trying to do the transformations for a particular column in the source DB and add the result to a new column in destination DB.
For doing this I have tried creating KSQL streams to manipulate the data in the Kafka-topic. But I am unable to sync the result with the destination database.
Step1:
Since kafka topic('Persons') has schema and payload creating a raw stream like below:
CREATE STREAM RawPersons (
schema STRUCT<
type VARCHAR,
fields ARRAY<STRUCT<
type VARCHAR,
optional BOOLEAN,
field VARCHAR
>>,
optional BOOLEAN,
name VARCHAR
>,
payload STRUCT<
ID INT,
LastName VARCHAR,
FirstName VARCHAR,
Age INT
>
) WITH (
KAFKA_TOPIC='Persons',
VALUE_FORMAT='JSON'
);
KAFKA_TOPIC = Persons
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"LastName"},{"type":"string","optional":true,"field":"FirstName"},{"type":"int32","optional":true,"field":"Age"}],"optional":false,"name":"azure.SourceDebe.dbo.Persons.Value"},"payload":{"ID":1,"LastName":"Ram","FirstName":"Raj","Age":30}}
--
Step2:
Here creating another column Result with just a simple calculation(Result = Age+1)
CREATE STREAM PersonsJson AS
SELECT
payload->ID AS ID,
payload->LastName AS LastName,
payload->FirstName AS FirstName,
payload->Age AS Age,
payload->Age + 1 AS Result
FROM
RawPersons;
KAFKA_TOPIC = PersonsJson
{"ID":1,"LASTNAME":"Ram","FIRSTNAME":"Raj","AGE":90, "RESULT":91}
I am not able to sync this record to the destination database. Can anyone pls help on this?
Any help on this is appriciated. Thanks in advance.