Filter kafka topics using Debezium sink connectors to push data to oracle

21 views
Skip to first unread message

~Sangita Mukherjee~

unread,
Feb 24, 2026, 2:59:07 AMFeb 24
to cran...@gmail.com, debe...@googlegroups.com
Hi Chris,

We are currently using debezium version 2.2 and have a requirement where we want to extract data from kafka topic that flowed in only during the last 2 days though the topics has data of last 7-14 days.

In the lower version of debezium (1.9) we were using the below approach however post upgrade to 2.2 the same syntax is unable to filter data based on epoch time.

        "transforms.FilterEarlierThan.type": "io.confluent.connect.transforms.Filter$Value",
        "transforms.FilterEarlierThan.filter.type": "include",
        "transforms.FilterEarlierThan.missing.or.null.behavior": "exclude",
        "transforms.FilterEarlierThan.filter.condition": "$[?(@.source.ts_ms > 1727656088775)]",

Can you please guide on how should we be able to achieve the same in debezium 2.2?


--
Thanks!
Sangita Mukherjee

Chris Cranford

unread,
Feb 24, 2026, 3:42:57 AMFeb 24
to debe...@googlegroups.com
Hi, the same procedure should work with 2.2. 

~Sangita Mukherjee~

unread,
Mar 4, 2026, 2:16:11 AM (7 days ago) Mar 4
to debe...@googlegroups.com
Hi Chris, 
We tried doing the same however we see the connector is detecting 0 data (consumer lag 0 from beginning) using the filter as below, do you have any further suggestion ?

 apmt.mvii-nlrot.cdh.uat2.oraclesink-rhel-test-v2: |
  {
     "name":"apmt.mvii-nlrot.cdh.uat2.oraclesink-rhel-test-v2",
     "config":{
       "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
       "consumer.override.group.id":"apmt.mvii-nlrot.cdh.uat2.oraclesink-rhel-test-v2",
       "tasks.max":"3",
       "topics":"apmt.mvii-nlrot.n4.uat2.topic.confidential.dedicated.cdh-datastream.vsl_crane_statistics",
       "connection.url":"xxxx",
       "connection.user":"xxxx",
       "connection.password":"${secrets:kafka-connect-connectors-passwords:oracle-uat-password}",
       "transforms":"unwrap,striptopic,InsertField,FilterEarlierThan",
       "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.drop.tombstones":"false",
       "transforms.striptopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
       "transforms.striptopic.regex":"apmt.mvii-nlrot.n4.uat2.topic.confidential.dedicated.cdh-datastream.(.*)",
       "transforms.striptopic.replacement":"$1_test",
       "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
       "transforms.InsertField.timestamp.field": "messagets",

        "transforms.FilterEarlierThan.type": "io.confluent.connect.transforms.Filter$Value",
        "transforms.FilterEarlierThan.filter.type": "include",
        "transforms.FilterEarlierThan.missing.or.null.behavior": "exclude",
        "transforms.FilterEarlierThan.filter.condition": "$[?(@.source.ts_ms > 1772430786000)]",
       "auto.create":"true",
       "auto.evolve":"true",
       "insert.mode":"upsert",
       "delete.enabled":"true",
       "pk.mode":"record_key",
       "max.retries":"3",
       "quote.sql.identifiers":"never",
       "batch.size":"10000",
       "consumer.override.max.poll.records": "10000",
       "consumer.override.max.poll.interval.ms":"600000",
       "consumer.override.fetch.max.wait.ms":"10000",
       "consumer.override.auto.offset.reset":"earliest"
     }
  }
our topics has a retention of 14 days and new data is flowing in every day .



--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/79363a4d-e030-4cf1-a5b1-550bf54e24c8%40gmail.com.


--
Thanks!
Sangita Mukherjee

Chris Cranford

unread,
Mar 4, 2026, 9:26:50 AM (7 days ago) Mar 4
to debezium
Hi -

I believe the issue is likely due to the transformation order. Your `FilterEarlierThan` relies on the `source.ts_ms` field, but this field does not exist because it's been removed due to the `unwrap` transform. Have you tried rearranging the transformations to get the event payloads in the expected order?

Thanks,
-cc

~Sangita Mukherjee~

unread,
Mar 6, 2026, 12:08:46 PM (5 days ago) Mar 6
to debe...@googlegroups.com
Hi Chirs,

Thanks a lot for your help. Yes that was the issue. Post putting FilterEarlierthan before the other transformers solved the issue.

Regards,
Sangita

Thanks!
Sangita Mukherjee

Reply all
Reply to author
Forward
0 new messages