Kafka Connect JDBC produces duplicate messages in timestamp+incrementing mode

4,096 views
Skip to first unread message

Jim Malone

unread,
Jun 10, 2016, 11:30:24 AM6/10/16
to Confluent Platform
Hi,

We have a setup with 2 Kafka Connect instances on separate nodes in distributed mode, using queries with timestamp+incrementing logic, and below is an example connnector config:

{
 
"name": "topic1",
 
"config": {
   
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
   
"tasks.max": 1,
   
"connection.url": "jdbc:sqlserver://...",
   
"mode": "timestamp+incrementing",
   
"incrementing.column.name": "id",
   
"timestamp.column.name": "ts",
   
"batch.max.rows": 1000,
   
"topic.prefix": "topic1",
   
"query": "WITH base AS (SELECT x, y, z FROM table WHERE col1 <> 3) SELECT * FROM base"
 
}
}

What we are seeing is that for some connectors (doesn't seem to be all even though they're all setup the same), kafka is receiving duplicate messages.  It doesn't start happening right away always (sometimes it does), and once it starts duplicating messages it's not necessarily every message that's duplicated.  This isn't the end of the world since our downstream apps can handle the duplicates, but it's beginning to happen more often and is causing unnecessary processing delays.

What I did notice is that when Kafka Connect stores the offsets, those too are being duplicated within Kafka. So that seems to be the obvious reason why the messages are duplicated... because kafka connect is telling itself to read from the same offset (timestamp+incrementing id) more than once.

Has anyone encountered this before?  Are we doing something wrong?  Any idea where the issue is originating?

My next step is to go through the Kafka Connect logs (which are quite verbose even in INFO mode), so I will update this thread with any findings there.  But if you have somewhere specific in those logs that I should check then please let me know.

One other thing, which may actually be more troubling albeit less frequent issue... when troubleshooting the duplicate message issue and sifting through the messages in kafka we noticed that some messages are just straight up missing when there's clearly a DB record for them which was skipped over.  I don't know if that has the same root cause as the duplicates (offset mis-management), but it would definitely represent a bigger issue for us.

Thanks for your time,
Jim


Peng Liu

unread,
Jun 10, 2016, 2:41:53 PM6/10/16
to Confluent Platform
I experienced the same thing. My understanding is that Connectors have at-least-once semantics due to how offset commits work. That is the offsets are only committed every t amount of time. So between two commits, if something goes wrong, the Connector rewinds back to the last offset, but that could be up to t behind where it actually last read a record, leading to duplicates. That is my understanding anyway, so please correct me if I'm wrong.

In my case, I noticed duplication generally happened when there was some coordinator thrashing in the Connector logs (connecting to the Coordinator, marking it as dead, reconnecting, etc in rapid succession). This was followed by a backlog of record reads (the log indicated that up to over 100k records were uncommitted). I can't confirm this is the cause, but this is my suspicion.

Again, please correct me if I'm in anyway misinterpreting something :)

Jim Malone

unread,
Jun 10, 2016, 4:06:47 PM6/10/16
to Confluent Platform
Thanks for the response.  When you say that you "experienced" the same issue, does that mean you've resolved it?  If so how?  Because I couldn't infer that from your post.  What you're saying about the offset committal does make sense though, although I can't confirm that's correct.

I dug through the connect logs some and here's what I came up with:  It seems like the duplicate entries coordinate with a "Socket Closed" exception being thrown by the JDBC connector.  Below is the sequence of logs before each of these exceptions.  The logs after them seem unrelated.  I need to dig into why we are getting the Socket Closed exceptions in the first place, but it's still discouraging that they are causing duplicate messages to be created.

[2016-06-07 04:03:54,630] INFO Stopping task topic1-0 (org.apache.kafka.connect.runtime.Worker:305)
[2016-06-07 04:03:54,630] INFO Starting graceful shutdown of thread WorkerSourceTask-topic1-0 (org.apache.kafka.connect.util.ShutdownableThread:119)
[2016-06-07 04:03:54,631] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='WITH base AS (SELECT x, y, z FROM table WHERE col1 <> 3) SELECT * FROM base', topicPrefix='topic1', timestampColumn='ts', incrementingColumn='id'}: com.microsoft.sqlserver.jdbc.SQLServerException: Socket closed (io.confluent.connect.jdbc.JdbcSourceTask:238)

Some other errors/interesting messages I found in the logs are below, however I don't believe they are necessarily tied to this issue due to their relative distance from the times we are seeing duplicates.  I'm just pasting them here in case they can help shed light on anything else for someone:

[2016-06-06 23:34:57,037] INFO Attempt to heart beat failed since the group is rebalancing, try to re-join group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:623)

[2016-06-10 12:34:21,424] INFO Attempt to heart beat failed since member id is not valid, reset it and try to re-join group. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:631)

[2016-06-07 00:43:10,265] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:902)
[2016-06-07 00:43:10,266] INFO Attempt to join group prod-1 failed due to unknown member id, resetting and retrying. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:354)

[2016-06-07 00:43:36,861] INFO Marking the coordinator 2147483647 dead. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:529)


Anyone else encountering this issue?  I'd appreciate any input you can send my way.

Thanks

Peng Liu

unread,
Jun 10, 2016, 5:18:47 PM6/10/16
to Confluent Platform
I didn't resolve it since (again, this is my understanding, and I could be wrong), the semantics are at-least-once but not exactly-once, so there will always be a chance of duplication (until Confluent introduces exactly-once semantics). The way I "resolved" it was more by making sure the entries were idempotent. For example, since I was using reading rows from a database, reading the same row twice wouldn't matter since it'd overwrite the previous row with the exact same data (due to keying on each row's unique primary key). If each entry was being used to trigger an event, it would be a much harder problem to solve, sadly :(

Ewen Cheslack-Postava

unread,
Jun 11, 2016, 5:09:28 PM6/11/16
to Confluent Platform
If you're using timestamp+increasing, you should only see duplicates if there is a fault, i.e. something crashes or does not complete shutting down in time during a rebalance or there is an exception that allowing partial data to get through.

I don't think that particular error should cause a problem -- we catch it, log that error, cancel the query, and don't return any data. Timeouts can definitely be a problem as well -- if the task doesn't stop quickly enough, rebalances may timeout and need to kick the worker out of the group, in which case some data it has written may not have had its offsets committed and be reprocessed. Log messages about heartbeats failing aren't necessarily a problem -- they are at INFO level because that is the normal way that the broker indicates another group member has triggered a rebalance. But are those logs in order? There are log messages fro 2016-06-06, followed by 2016-06-10, and then followed by 2016-06-07?

Also, I notice that you're using a query + timestamp+increasing mode. Can you check if there's a log line (it'd be at DEBUG level) that contains the text "prepared SQL query"? It might help to just sanity check that we're getting the query we expect and that it'll do the right comparisons on the timestamp and ID columns. Additionally, do you have schemas for the tables you're selecting data from?

(I also noticed you don't have the timestamp.delay.interval.ms configuration, you may want to add that to account for transaction execution time which could lead to missing some row updates).

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/782adf0a-39aa-40be-8f69-4b8d2999864a%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Jim Malone

unread,
Jun 13, 2016, 11:04:15 AM6/13/16
to Confluent Platform
What is the correct way to turn on DEBUG mode for the logs? Once I turn that on I can give you the prepared SQL queries.  As for the schemas for the tables, what we're doing is selecting data from CDC tables within sql server, and excluding any rows with operation = 3 (the state prior to an update).  So we have the default CDC columns, along with all of the columns from the underlying table, along with 2 additional columns we created in able to use timestamp+incrementing mode (we just called them 'id' and 'ts').  Anything specific about the schema you want to see?

And as far as the 2nd set of log messages I sent... yes they're out of order but weren't meant to be sequential.  I was just collecting anything that seemed out of the ordinary.  I should have put them in order before sending them though.

Also thanks for the tip on using timestamp.delay.interval.ms.  I will give that a shot and see how it goes.  Hopefully that resolves any issues we have with missing rows.

Jim
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jun 14, 2016, 12:26:27 AM6/14/16
to Confluent Platform
On Mon, Jun 13, 2016 at 8:04 AM, Jim Malone <malo...@gmail.com> wrote:
What is the correct way to turn on DEBUG mode for the logs?

DEBUG mode for logs is a log4j setting -- if you're using Confluent Platform you can edit etc/kafka/connect-log4j.properties and adding a line like log4j.logger.org.apache.kafka=DEBUG
 
Once I turn that on I can give you the prepared SQL queries.  As for the schemas for the tables, what we're doing is selecting data from CDC tables within sql server, and excluding any rows with operation = 3 (the state prior to an update).  So we have the default CDC columns, along with all of the columns from the underlying table, along with 2 additional columns we created in able to use timestamp+incrementing mode (we just called them 'id' and 'ts').  Anything specific about the schema you want to see?

Nothing specific, but I was curious about column types just to see if anything stood out as possibly problematic or a known issue.
 

And as far as the 2nd set of log messages I sent... yes they're out of order but weren't meant to be sequential.  I was just collecting anything that seemed out of the ordinary.  I should have put them in order before sending them though.

Ok, makes sense.

-Ewen
 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

hop...@gmail.com

unread,
Mar 14, 2018, 1:02:46 AM3/14/18
to Confluent Platform
Hi Jim,

How did you resolve this issue - especially with missing records? I am using timestamp+incrementing with a query, on SQL Server and missing records here and there. What I have noticed is that any second and following transactions batches in DB containing updates on rows with incrementing IDs already polled in earlier polls are being skipped. Meaning updates on current rows skipped while new inserts are coming through.

Thanks,
hf

BadwolF ForeveR

unread,
Mar 6, 2019, 1:29:46 AM3/6/19
to Confluent Platform
Hi Jim,

I may be very late in replying (since its an old thread) but may help someone.

I have sort of encounter this kind of issue both with bulk and incrementing+timestamp and based on my investigation the following were the culprit:

1. When running query mode with or without any incremeting and/or timestamp column, there are lot of dynamic variables involved which can go wrong. First being the source system. Since we are running a query that is going to be executed on source system and then the data would be sourced in, so ideally we are at mercy of source DB. I see that in your connect configuration you have not provided any intervals or delays:

"poll.interval.ms": 86400000, /time between 2 polls (5 secs default)
"batch.max.rows": 10000, //how many rows/batch; controls the number of batches/poll
"timestamp.delay.interval.ms": 0 //essentially helps with commit in DB

Now if your source table has too much data which cannot be processed in 5 seconds (since you are not providing poll.interval.ms), Connector could be sourcing in the data from previous poll and next poll may get triggered. Also, your batch rows is too less (1000) if source system data is too much (for sure on single thread, 1000 records per batch for even 50000 rows would need 50 batches to be complete on single thread (source fetch + network transfer + buffering + network trabsfer to kafka + Kafka Commit & acks (if acks are set)) you get the idea, kind of tricky. My understanding is that Kafka Connector commits to its offset topic after every batch, so if your first poll inst't complete and the second poll has started (which would take from committed offsets as starting point) you are bound to get duplicates.

Worst case, your job may go into an infinite loop (mine did) since there are chances that the first batch itself is not finished and the second poll started and then third and fourth and so on ... So in my opinion its the correct tuning of all the levers that would allow removing duplicates

(and I am not even going into connector crashing and cluster rebalancing scenario which pauses everything for all connectors; didn't liked it at all). So I would recommend to play around with different permutations of all these configurations especially, poll.interval.ms and batch.max.rows based on your h/w processing power. And very well this is one of the many reasons why you have duplicate offsets as well (if even one batch of first poll is not completed and second poll has started; both will commit same offsets).

One thing IMHO to always remember, one table == 1 thread ( even if you set the max.tasks to 10). See how much your hardware can achieve on a single thread.

For your second issue; records missing is most probably not associated with the offset (mis)management. You will get duplicates with that. Records missing can easily happen when you have 2 or more records with same incrementingId column value and/or timestamp for multiple records are same. Consider this scenario:

INVOICE_ID   INVOICE_TYPE    UPDATE_TS
========== . ============ .   ==========
1234                 23                          2019-03-06 13:44:58
1234 .                54                          2019-03-06 13:44:58
2345                  11                          2019-03-06 13:44:58
4567                   18                         2019-03-06 13:44:58

Now imagine your connector processed the first record (assume that was the 1000th in the batch), committed the offset and started processing the second record above and suddenly it crashed or got restarted due to rebalance. Now when the connector will re-run the query it will skip record 2 as the offset would have the information that id -> 1234; ts -> 2019-03-06 13:44:58 and you query will become (the where condition will be added by connect):

WITH base AS (SELECT x, y, z FROM table WHERE col1 <> 3) SELECT * FROM base where update_ts < currentTimeinMillis AND ((update_ts = '2019-03-06 13:44:58' AND inc_id > 1234) OR update_ts > '2019-03-06 13:44:58') ORDER BY update_ts, inc_id desc

This logic of where condition is in TimestampIncrementingCriteria.java class (github link: https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingCriteria.java; method name: "timestampIncrementingWhereClause")

if your data is like this, then you can see that it can skip certain records, if the incrementing ID has repeating values. For Kafka Connect to function properly, the incrementing ID column needs to be forward incrementing as well as it needs to be unique (there can be only one incrementing column).
With dates there is some flexbility wherein multiple date columns can be provided (and the code does a COALASCE on that list and then compares with the timestamp) but still if the DB does not have millisecond level timestamp, there can be data getting missed (since multiple messages may get comitted at same timestamp and the update_ts is ot system generated timestamp)

For Socket close you may check with your DB Admin how much is the connection close time set and then you can try and play around with the batch and poll interval configurations to see how you can process everything within the connection closing time.

I hope this helps you to resolve your issues.



On Friday, 10 June 2016 11:30:24 UTC-4, Jim Malone wrote:
Reply all
Reply to author
Forward
0 new messages