Hi there,
I'm seeking for ideas/workarounds on this specific problem, hoping that someone ran into this already.
We're sourcing a MySQL DB with the JDBC connector (shipped with Confluent 3.2.2). The connector runs in the
timestamp+incrementing mode. Only inserts are done in the DB, the rows are never updated. We noticed that some records were missing, even with the parameter
timestamp.delay.interval.ms set to to a high value.
2 issues were identified:
- The DB is configured to use the system timezone (CET), and that cannot be changed as we have no control on it.
- The ts field (typed as timestamp in the DB schema) is populated with a UTC timezoned value at insert time, but the DB interprets it as a timestamp in CET. So the effective stored value of the ts field is UTC-1.
Note: it shouldn't influence the sourcing as the connector uses the last sourced value as offset, and it starts sourcing from there for the next cycle.
With the following connector config:
{
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "ts",
"incrementing.column.name": "id",
"transforms.insertKey.fields": "key",
"connection.password": "pwd",
"validate.non.null": "false",
"tasks.max": "1",
"transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"query": "SELECT fields FROM table",
"transforms": "insertKey",
"batch.max.rows": "100",
"timestamp.delay.interval.ms": "5000",
"mode": "timestamp+incrementing",
"topic.prefix": "my-topic",
"connection.user": "user",
"poll.interval.ms": "10000",
"connection.url": "jdbc:mysql://host/db?useUnicode=true&characterEncoding=UTF-8"
}
}
The connector's debug message prints: (dates are supposedly in UTC)
[2017-12-19 12:00:17,589] DEBUG Executing prepared statement with start time value = 2017-12-19 12:00:01.000 end time = 2017-12-19 13:00:12.000 and incrementing value = 6499228341699480218 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:159)
The issue is the
end time being in CET instead of UTC, due to the fact that the connector uses
CURRENT_TIMESTAMP to retrieve this value and that the DB is in CET.
With this we didn't notice any missing records anymore, but this solution is dependent on the Daylight saving time. So the connector's configuration needs to be changed and redeployed twice a year.
The other solutions we could think of are more heavy:
- Write our own Kafka connector to handle this time shift.
- Similarly, write our own Kafka producer.
- Patch the JDBC connector to use `UTC_TIMESTAMP` instead of `CURRENT_TIMESTAMP`
But these will be exclusively for this DB.
We're looking for a more long term solution, if it exists ;-)
Thanks,
Jeremy