Filter kafka topics using Debezium sink connectors to push data to oracle

42 views
Skip to first unread message

~Sangita Mukherjee~

unread,
Feb 24, 2026, 2:59:07 AMFeb 24
to cran...@gmail.com, debe...@googlegroups.com
Hi Chris,

We are currently using debezium version 2.2 and have a requirement where we want to extract data from kafka topic that flowed in only during the last 2 days though the topics has data of last 7-14 days.

In the lower version of debezium (1.9) we were using the below approach however post upgrade to 2.2 the same syntax is unable to filter data based on epoch time.

        "transforms.FilterEarlierThan.type": "io.confluent.connect.transforms.Filter$Value",
        "transforms.FilterEarlierThan.filter.type": "include",
        "transforms.FilterEarlierThan.missing.or.null.behavior": "exclude",
        "transforms.FilterEarlierThan.filter.condition": "$[?(@.source.ts_ms > 1727656088775)]",

Can you please guide on how should we be able to achieve the same in debezium 2.2?


--
Thanks!
Sangita Mukherjee

Chris Cranford

unread,
Feb 24, 2026, 3:42:57 AMFeb 24
to debe...@googlegroups.com
Hi, the same procedure should work with 2.2. 

~Sangita Mukherjee~

unread,
Mar 4, 2026, 2:16:11 AMMar 4
to debe...@googlegroups.com
Hi Chris, 
We tried doing the same however we see the connector is detecting 0 data (consumer lag 0 from beginning) using the filter as below, do you have any further suggestion ?

 apmt.mvii-nlrot.cdh.uat2.oraclesink-rhel-test-v2: |
  {
     "name":"apmt.mvii-nlrot.cdh.uat2.oraclesink-rhel-test-v2",
     "config":{
       "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
       "consumer.override.group.id":"apmt.mvii-nlrot.cdh.uat2.oraclesink-rhel-test-v2",
       "tasks.max":"3",
       "topics":"apmt.mvii-nlrot.n4.uat2.topic.confidential.dedicated.cdh-datastream.vsl_crane_statistics",
       "connection.url":"xxxx",
       "connection.user":"xxxx",
       "connection.password":"${secrets:kafka-connect-connectors-passwords:oracle-uat-password}",
       "transforms":"unwrap,striptopic,InsertField,FilterEarlierThan",
       "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.drop.tombstones":"false",
       "transforms.striptopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
       "transforms.striptopic.regex":"apmt.mvii-nlrot.n4.uat2.topic.confidential.dedicated.cdh-datastream.(.*)",
       "transforms.striptopic.replacement":"$1_test",
       "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
       "transforms.InsertField.timestamp.field": "messagets",

        "transforms.FilterEarlierThan.type": "io.confluent.connect.transforms.Filter$Value",
        "transforms.FilterEarlierThan.filter.type": "include",
        "transforms.FilterEarlierThan.missing.or.null.behavior": "exclude",
        "transforms.FilterEarlierThan.filter.condition": "$[?(@.source.ts_ms > 1772430786000)]",
       "auto.create":"true",
       "auto.evolve":"true",
       "insert.mode":"upsert",
       "delete.enabled":"true",
       "pk.mode":"record_key",
       "max.retries":"3",
       "quote.sql.identifiers":"never",
       "batch.size":"10000",
       "consumer.override.max.poll.records": "10000",
       "consumer.override.max.poll.interval.ms":"600000",
       "consumer.override.fetch.max.wait.ms":"10000",
       "consumer.override.auto.offset.reset":"earliest"
     }
  }
our topics has a retention of 14 days and new data is flowing in every day .



--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/79363a4d-e030-4cf1-a5b1-550bf54e24c8%40gmail.com.


--
Thanks!
Sangita Mukherjee

Chris Cranford

unread,
Mar 4, 2026, 9:26:50 AMMar 4
to debezium
Hi -

I believe the issue is likely due to the transformation order. Your `FilterEarlierThan` relies on the `source.ts_ms` field, but this field does not exist because it's been removed due to the `unwrap` transform. Have you tried rearranging the transformations to get the event payloads in the expected order?

Thanks,
-cc

~Sangita Mukherjee~

unread,
Mar 6, 2026, 12:08:46 PMMar 6
to debe...@googlegroups.com
Hi Chirs,

Thanks a lot for your help. Yes that was the issue. Post putting FilterEarlierthan before the other transformers solved the issue.

Regards,
Sangita

Thanks!
Sangita Mukherjee

~Sangita Mukherjee~

unread,
Mar 23, 2026, 3:08:58 AM (10 days ago) Mar 23
to debe...@googlegroups.com
Hi Chris,

Greetings!

On the same context as before, currently we are using the FilterEarlierThan method to ingest only the latest data into oracle DB, however we are seeing a difference when the connector has single topic vs multiple topic.
In case of single topic , we indeed see the our consumer metrics that the consumer lag comes down drastically initially and slows down when it has to start pushing the latest data. However when there are multiple topics in the same connector, we see the consumer lag graph does not show any such behaviour an rather gives the same performance throughout. This is making us doubt if the FilterEarlierThan is working in case of multiple topics.

  apmt.aqaba-joaqj.cdh.prod.oraclesink-rd2026-aks1-g1-v1: >
    {
      "name":"apmt.aqaba-joaqj.cdh.prod.oraclesink-rd2026-aks1-g1-v1",
      "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "consumer.override.group.id":"apmt.aqaba-joaqj.cdh.prod.oraclesink-rd2026-aks1-g1-v1",
        "tasks.max":"15",
        "topics":"apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.srv_event_field_changes,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.srv_event,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.inv_unit,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.inv_move_event,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.inv_wi",
        "connection.url":"jdbc:oracle:thin:@(DESCRIPTION = (ENABLE=BROKEN) (ADDRESS = (PROTOCOL = TCP)(HOST = xxx)(PORT = 11534)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = xxx)))",
        "connection.user":"aqb_stg",
        "connection.password":"${secrets:kafka-connect-connectors-passwords:oracle-prod-password}",
        "transforms":"unwrap,striptopic,InsertField",

        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"false",
        "transforms.striptopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.striptopic.regex":"apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.(.*)",
        "transforms.striptopic.replacement":"$1",

        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertField.timestamp.field": "messagets",
        "transforms.FilterEarlierThan.type": "io.confluent.connect.transforms.Filter$Value",
        "transforms.FilterEarlierThan.filter.type": "include",
        "transforms.FilterEarlierThan.missing.or.null.behavior": "exclude",
        "transforms.FilterEarlierThan.filter.condition": "$[?(@.source.ts_ms > 1773703433567)]",
        "auto.create":"false",

        "auto.evolve":"true",
        "insert.mode":"upsert",
        "delete.enabled":"true",
        "pk.mode":"record_key",
        "max.retries":"3",
        "quote.sql.identifiers":"never",
        "batch.size":"5000",
        "consumer.override.max.poll.records": "5000",
--
Thanks!
Sangita Mukherjee

~Sangita Mukherjee~

unread,
Mar 23, 2026, 4:01:19 AM (10 days ago) Mar 23
to debe...@googlegroups.com
Sorry the connect connector configuration currently in use is this:

  apmt.aqaba-joaqj.cdh.prod.oraclesink-rd2026-aks1-g1-v1: >
    {
      "name":"apmt.aqaba-joaqj.cdh.prod.oraclesink-rd2026-aks1-g1-v1",
      "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "consumer.override.group.id":"apmt.aqaba-joaqj.cdh.prod.oraclesink-rd2026-aks1-g1-v1",
        "tasks.max":"15",
        "topics":"apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.srv_event_field_changes,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.srv_event,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.inv_unit,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.inv_move_event,apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.inv_wi",
        "connection.url":"jdbc:oracle:thin:@(DESCRIPTION = (ENABLE=BROKEN) (ADDRESS = (PROTOCOL = TCP)(HOST = xxxx)(PORT = 11534)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = ppaqbstgr)))",

        "connection.user":"aqb_stg",
        "connection.password":"${secrets:kafka-connect-connectors-passwords:oracle-prod-password}",
        "transforms":"FilterEarlierThan,unwrap,striptopic,InsertField",

        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"false",
        "transforms.striptopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.striptopic.regex":"apmt.aqaba-joaqj.n4.prod.topic.confidential.dedicated.cdh-datastream1.(.*)",
        "transforms.striptopic.replacement":"$1",
        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertField.timestamp.field": "messagets",
        "transforms.FilterEarlierThan.type": "io.confluent.connect.transforms.Filter$Value",
        "transforms.FilterEarlierThan.filter.type": "include",
        "transforms.FilterEarlierThan.missing.or.null.behavior": "exclude",
        "transforms.FilterEarlierThan.filter.condition": "$[?(@.source.ts_ms > 1773703433567)]",
        "auto.create":"false",
        "auto.evolve":"true",
        "insert.mode":"upsert",
        "delete.enabled":"true",
        "pk.mode":"record_key",
        "max.retries":"3",
        "quote.sql.identifiers":"never",
        "batch.size":"5000",
        "consumer.override.max.poll.records": "5000",
        "consumer.override.max.poll.interval.ms":"600000",
        "consumer.override.fetch.max.wait.ms":"10000",
        "consumer.override.auto.offset.reset":"earliest"
        }
    }

Thanks!
Sangita Mukherjee

Chris Cranford

unread,
Mar 23, 2026, 6:46:59 AM (9 days ago) Mar 23
to debe...@googlegroups.com
Hi -

I am afraid you are not using the Debezium sink but rather Confluent's. Perhaps it would be good to ask in their community about their connector's behavior.

Thanks,
-cc
Reply all
Reply to author
Forward
0 new messages