[debezium] How to add date column in transforms.convertTimezone.exclude.list

231 views
Skip to first unread message

Sooya Park

unread,
May 24, 2024, 2:36:28 AM5/24/24
to debe...@googlegroups.com
hello

Dear debezium members

I am using debezium mysql connector.

Version 2.4.1 is being used.

Among mysql's tables, there is a column with a datetime type, and in the case of the datetime column, the timezone is converted to Asia/Seoul to issue an event.

The composition is as follows.

"database.include.list": "game_db1",
"table.include.list": "game_db1.users, game_db1.inventory"

"transforms": "convertTimezone",
"transforms.convertTimezone.type": "io.debezium.transforms.TimezoneConverter",
"transforms.convertTimezone.converted.timezone": "Asia/Seoul",
"transforms.convertTimezone.exclude.list": "source:users:created_dt,users:update_dt,inventory:created_dt"

In the case of date columns, I wanted to exclude them, so I listed date type columns to be excluded in transforms.convertTimezone.exclude.list as shown above, but the warning message below continues to appear.

[2024-05-24 06:14:40,676] WARN Skipping conversion for unsupported logical type: org.apache.kafka.connect.data.Date for field: created_dt (io.debezium.transforms.TimezoneConverter)
[2024-05-24 06:14:40,676] WARN Skipping conversion for unsupported logical type: org.apache.kafka.connect.data.Date for field: update_dt(io.debezium.transforms.TimezoneConverter)

Could it be that the config was set incorrectly?

I would like to ask what I did wrong and what I should do.

Thank you in advance.

Chris Cranford

unread,
May 25, 2024, 8:19:14 PM5/25/24
to debe...@googlegroups.com
What is your full connector configuration?

Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/CAEL_JowWK5O7ByYhOr2wBN6EppueC-KO1y-PCSosrdVuHGFR1A%40mail.gmail.com.

Sooya Park

unread,
May 26, 2024, 8:56:29 PM5/26/24
to debe...@googlegroups.com
Hello Chris

Here is the full configuration of my connector.


{
  "acks": "all",
  "bigint.unsigned.handling.mode": "long",
  "binary.handling.mode": "bytes",
  "binlog.buffer.size": "0",
  "connect.keep.alive": "true",
  "connect.keep.alive.interval.ms": "60000",
  "connect.timeout.ms": "30000",
  "database.history.kafka.bootstrap.servers": "b-1.xxxx.kafka.ap-northeast-2.amazonaws.com:9092,b-2.xxxx.kafka.ap-northeast-2.amazonaws.com:9092,b-3.xxxx.kafka.ap-northeast-2.amazonaws.com:9092",
  "database.history.kafka.query.timeout.ms": "3000",
  "database.history.kafka.recovery.attempts": "100",
  "database.history.kafka.recovery.poll.interval.ms": "100",
  "database.history.kafka.topic": "game_db1_db_history",
  "database.hostname": "xxxx..rds.amazonaws.com",
  "database.include.list": "game_db1",
  "database.jdbc.driver": "com.mysql.cj.jdbc.Driver",
  "database.password": "xxxxx",
  "database.port": "3306",
  "database.server.id": "103",
  "database.server.id.offset": "10000",
  "database.user": "xxxxx",
  "decimal.handling.mode": "double",
  "enable.time.adjuster": "true",
  "event.deserialization.failure.handling.mode": "fail",
  "event.processing.failure.handling.mode": "fail",
  "gtid.source.filter.dml.events": "true",
  "heartbeat.action.query": "select 1 from dual;",
  "heartbeat.interval.ms": "60000",
  "heartbeat.topics.prefix": "heartbeat",
  "include.query": "false",
  "include.schema.changes": "true",
  "include.schema.comments": "false",
  "inconsistent.schema.handling.mode": "fail",
  "incremental.snapshot.chunk.size": "1024",
  "max.batch.size": "2048",
  "max.queue.size": "8192",
  "max.queue.size.in.bytes": "0",
  "min.row.count.to.stream.results": "1000",
  "offset.flush.interval.ms": "20000",
  "offset.flush.timeout.ms": "120000",
  "poll.interval.ms": "500",
  "provide.transaction.metadata": "false",
  "query.fetch.size": "0",
  "retriable.restart.connector.wait.ms": "10000",
  "sanitize.field.names": "false",
  "schema.history.internal.kafka.bootstrap.servers": "b-1.xxxx.kafka.ap-northeast-2.amazonaws.com:9092,b-2.xxxx.kafka.ap-northeast-2.amazonaws.com:9092,b-3.xxxx.kafka.ap-northeast-2.amazonaws.com:9092",
  "schema.history.internal.kafka.topic": "game_db1_schema_history",
  "schema.name.adjustment.mode": "avro",
  "signal.data.collection": "game_db1.debezium_signals",
  "signal.enabled.channels": "source",
  "snapshot.delay.ms": "0",
  "snapshot.lock.timeout.ms": "10000",
  "snapshot.locking.mode": "none",
  "snapshot.max.threads": "1",
  "snapshot.mode": "never",
  "snapshot.new.tables": "off",
  "source.struct.version": "v2",
  "table.ignore.builtin": "true",
  "table.include.list": "game_db1.users, game_db1.inventory",
  "tasks.max": "1",
  "time.precision.mode": "connect",
  "tombstones.on.delete": "true",
  "topic.prefix": "game_db1",
  "transaction.topic": "${database.server.name}.transaction",
  "transforms": "convertTimezone",
  "transforms.convertTimezone.converted.timezone": "Asia/Seoul",
  "transforms.convertTimezone.exclude.list": "source:users:created_dt,users:update_dt,inventory:created_dt,inventory:updated_dt"
  "transforms.convertTimezone.type": "io.debezium.transforms.TimezoneConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "connector.class": "io.debezium.connector.mysql.MySqlConnector"
}

Thank you in advance.

Chris Cranford

unread,
May 29, 2024, 9:34:26 AM5/29/24
to debe...@googlegroups.com
Hi,

Does it work if you use

    source:users:created_dt,source:users:update_dt,source:inventory:created_dt,source:inventory:updated_dt

Basically, making sure each element in the comma-separated list is prefixed with "source:"

Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages