time.precision.mode not generating timestamps in milliseconds with Postgres

243 views
Skip to first unread message

Akshay Bande

unread,
May 26, 2024, 8:13:38 PM5/26/24
to debezium

Issue with Microsecond Precision in Timestamp Conversion Using Kafka Sink Connector

I am using the following Debezium PostgreSQL connector configuration to capture changes from my PostgreSQL database and publish them to Kafka. My PostgreSQL table has an updated_at column of type timestamptz.

When I update or create any record in the table, the Debezium connector publishes a change log message to Kafka. The updated_at field in these messages is formatted with microseconds precision, such as 2022-01-21T07:45:42.232000Z.


Here is my connector configuration:


{
  "name": "postgres_source",
  "database.user": "postgres",
  "drop.tombstones": "true",
  "topic.prefix": "test_",
  "schema.include.list": "public",
  "database.password": "password",
  "plugin.name": "pgoutput",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "database.dbname": "test_db",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "key.converter.schemas.enable": "false",
  "database.server.name": "postgres",
  "value.converter.schemas.enable": "false",
  "time.precision.mode": "connect",
  "database.hostname": "hostname",
  "table.include.list": "(.*)test(.*)",
  "database.port": "5432",
  "slot.name": "debezium_v2",
  "poll.interval.ms": "1000",
  "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms,db",
  "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
  "transforms.unwrap.drop.tombstones": "true",
  "transforms": "unwrap,Reroute",
  "transforms.Reroute.topic.regex": "(.*)test(.*)",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.Reroute.topic.replacement": "postgres_source_topic",
  "transforms.unwrap.delete.handling.mode": "rewrite"
}

I have set time.precision.mode=connect, which should generate timestamp columns with milliseconds precision, but it is not working as expected.

I am using a Kafka sink connector that employs the TimestampConverter transformation to convert the updated_at string into a timestamp format suitable for Iceberg tables. However, the conversion fails due to the microseconds precision in the updated_at field. The TimestampConverter uses the SimpleDateFormat class in Java, which does not support microseconds precision.

Question: How can I handle the conversion of timestamps with microseconds precision in the updated_at field when using Kafka sink connectors? Are there any recommended approaches or configurations to ensure accurate conversion of these timestamps, given that time.precision.mode=connect should handle milliseconds precision but isn't working as expected?


Disclaimer: This email and any files transmitted with it are confidential and intended solely for the use of the individual/entity addressed. If you are not the intended recipient, you are hereby notified that you must not use, disseminate, or copy this material in any form, or take any action based on it. If you have received this message erroneously, please immediately delete it and its attachments and notify the sender at Yellow.ai by electronic mail message reply. Thank you.
Reply all
Reply to author
Forward
0 new messages