Duplicate messages using the JDBC connector

1,021 views
Skip to first unread message

Sumit Arora

unread,
Jun 24, 2016, 3:42:16 PM6/24/16
to Confluent Platform
Hello,

While testing Kafka Connect's JDBC connector in distributed mode, I came across a problem where the connector fetches duplicate records. This only happens intermittently but when it happens, the JDBC connector gets into a loop and keeps publishing duplicate records to the Kafka topic until the source table is updated again and then it comes out of the loop and pulls the new set of records. I looked at connect's offset storage topic and found the connector keeps committing the same offset when this happens:

{"timestamp":1466785518817}
{"timestamp":1466786076200}
{"timestamp":1466786149983}
{"timestamp":1466786149983}
{"timestamp":1466786149983}
{"timestamp":1466786351400}
{"timestamp":1466787264710}
{"timestamp":1466787592337}
{"timestamp":1466787674277}
{"timestamp":1466787879080}
{"timestamp":1466787885187}
{"timestamp":1466788041833}
{"timestamp":1466788041833}
{"timestamp":1466788041833}
{"timestamp":1466788206620}
{"timestamp":1466790646970}
{"timestamp":1466790727190}
{"timestamp":1466791155057}
{"timestamp":1466791400510}
{"timestamp":1466791772800}
{"timestamp":1466791781513}
{"timestamp":1466791781513}
{"timestamp":1466791781513}
{"timestamp":1466791995267}
{"timestamp":1466795559553}
{"timestamp":1466795594923}
{"timestamp":1466795594923}
{"timestamp":1466795594923}
{"timestamp":1466795778403}
{"timestamp":1466795778403}
{"timestamp":1466795891320}
{"timestamp":1466796279937}
{"timestamp":1466796291090}
{"timestamp":1466796461280}
{"timestamp":1466796499693}
{"timestamp":1466796499693}
{"timestamp":1466796499693}
{"timestamp":1466796499693}
{"timestamp":1466796499693}
{"timestamp":1466796786680}

I am testing with 2 workers running on a single node.

any ideas?

Thanks,
Sumit


Shikhar Bhushan

unread,
Jun 24, 2016, 3:52:27 PM6/24/16
to Confluent Platform

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/fa4881d9-cc98-4432-9bc5-669bee10e32e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sumit Arora

unread,
Jun 24, 2016, 4:34:48 PM6/24/16
to Confluent Platform
Thanks for your quick response Shikhar !

The issue you pointed out is certainly similar to the problem I am facing as I am also using "timestamp" mode. However, I am using a column which is of type "datetime" in MS SQL server and it only stores values up to milliseconds precision. Since offsets are also stored with the same precision, we should not run into the issue you pointed out on JIRA? Am I correct or missing something?

Shikhar Bhushan

unread,
Jun 24, 2016, 4:40:00 PM6/24/16
to Confluent Platform
Ah right, this issue is only relevant to > millisecond precision.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Sumit Arora

unread,
Jun 28, 2016, 10:51:07 AM6/28/16
to Confluent Platform
Any other ideas on what might be causing this?
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Sumit Arora

unread,
Jun 28, 2016, 3:59:11 PM6/28/16
to Confluent Platform
Further testing reveals that this is a problem in standalone mode as well.

Shikhar Bhushan

unread,
Jun 28, 2016, 4:04:42 PM6/28/16
to Confluent Platform
Hi Sumit,

Would you be able to share more details, like the database you are testing against, and debug-level logs from the connector?

Thanks,

Shikhar

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Sumit Arora

unread,
Jun 28, 2016, 7:02:46 PM6/28/16
to Confluent Platform
Hi Shikhar,

I am testing this against SQL Server using MS JDBC driver. I am using a custom query with timestamp column (so I can rely on the Kafka Connect framework for tracking offsets). The timestamp column is of type datetime in SQL server.

The debug logs show that the connector is using the correct offset in the query so ideally there should not be any records returned by running the query (I confirmed this by executing  the query directly on SQL Server using the offset value). However, subsequent entries in the log show that the connector is returning 16 records every time it polls and that's what is causing the duplicates on Kafka:

[2016-06-28 22:34:23,626] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:23,626] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:23,628] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:20.937 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:23,633] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:23,633] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:23,633] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:23,634] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2016-06-28 22:34:28,634] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:28,634] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:28,636] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:25.943 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:28,641] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:28,641] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:28,641] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:28,648] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2016-06-28 22:34:33,641] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:33,641] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:33,644] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:30.960 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:33,649] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:33,649] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:33,649] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:33,657] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2016-06-28 22:34:38,649] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:38,649] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:38,652] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:35.960 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:38,656] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:38,656] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:38,656] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:38,657] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2016-06-28 22:34:40,058] DEBUG Scavenging sessions at 1467153280058 (org.eclipse.jetty.server.session:347)
[2016-06-28 22:34:43,657] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:43,657] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:43,659] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:40.963 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:43,663] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:43,663] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:43,663] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:43,664] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2016-06-28 22:34:48,663] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:48,664] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:48,666] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:45.980 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:48,670] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:48,670] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:48,670] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:48,672] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2016-06-28 22:34:53,670] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:211)
[2016-06-28 22:34:53,670] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.JdbcSourceTask:224)
[2016-06-28 22:34:53,672] DEBUG Executing prepared statement with timestamp value = 1467153055053 (2016-06-28 22:30:55.053)  end time 2016-06-28 22:34:50.980 (io.confluent.connect.jdbc.TimestampIncrementingTableQuerier:171)
[2016-06-28 22:34:53,677] DEBUG Closing this query for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:223)
[2016-06-28 22:34:53,678] DEBUG Returning 16 records for TimestampIncrementingTableQuerier{name='null', query='select cdc.lsn_time_mapping.tran_begin_time,cdc.dbo_users_CT.__$operation as operation,cdc.dbo_users_CT.id,cdc.dbo_users_CT.name,cdc.dbo_users_CT.email,cdc.dbo_users_CT.department,cdc.dbo_users_CT.modified from cdc.lsn_time_mapping join cdc.dbo_users_CT on cdc.lsn_time_mapping.start_lsn=cdc.dbo_users_CT.__$start_lsn and cdc.dbo_users_CT.__$operation != 3', topicPrefix='users1', timestampColumn='tran_begin_time', incrementingColumn='null'} (io.confluent.connect.jdbc.JdbcSourceTask:236)
[2016-06-28 22:34:53,678] DEBUG About to send 16 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2016-06-28 22:34:53,679] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)


The problem seems to resolve itself after a new update to the source table.

Thanks for your help !
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Shikhar Bhushan

unread,
Jul 1, 2016, 2:35:13 PM7/1/16
to Confluent Platform
Hi Sumit,

I won't be able to dig too deeply, but perhaps the behavior you are seeing has to do with 'Rounding of datetime Fractional Second Precision' https://msdn.microsoft.com/en-us/library/ms187819.aspx?

Best,

Shikhar

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Sumit Arora

unread,
Jul 6, 2016, 4:33:06 PM7/6/16
to Confluent Platform
Hi Shikhar,

I have finally been able to determine the root cause of this problem. It is not exactly the problem you pointed out but close :)

Basically, Microsoft's JDBC driver converts datetime columns to datetime2 before executing the query on SQL server. datetime2 is more precise than datetime and this conversion works fine on SQL Server 2014 because the driver just pads zeroes during conversion. However, we were testing this on SQL server 2016 and we found that in some cases, the driver actually decreases the datetime value during conversion. Here is an example:


DECLARE @datetime datetime = '2016-06-29 21:51:42.497';

DECLARE @datetime2 datetime2 = @datetime;

 

SELECT @datetime2 AS '@datetime2', @datetime AS '@datetime' 


On SQL Server 2014:

The query will not fetch any results (and thus no duplicates) because even after conversion tran_begin_time would be greater than 2016-06-29 21:51:42.497

@datetime2                         @datetime

 2016-06-29 21:51:42.4970000       2016-06-29 21:51:42.497


On SQL Server 2016:
In this case, the query will fetch results (and thus duplicates) because the datetime2 value after conversion for tran_begin_time (2016-06-29 21:51:42.4966667)< datetime value (2016-06-29 21:51:42.497)

@datetime2                         @datetime

 2016-06-29 21:51:42.4966667       2016-06-29 21:51:42.497



Our source server is going to be SQL Server 2014 for a while so this is no longer a blocker for us. However, I am going to open a ticket with Microsoft on this.


Another option is to use JTDS driver which doesn't convert datetime columns to datetime2 and therefore works fine. However, it doesn't officially support SQL Server 2014 or SQL server 2016.


Thanks for you help !

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages