I may be very late in replying (since its an old thread) but may help someone.
I have sort of encounter this kind of issue both with bulk and incrementing+timestamp and based on my investigation the following were the culprit:
1. When running query mode with or without any incremeting and/or timestamp column, there are lot of dynamic variables involved which can go wrong. First being the source system. Since we are running a query that is going to be executed on source system and then the data would be sourced in, so ideally we are at mercy of source DB. I see that in your connect configuration you have not provided any intervals or delays:
"batch.max.rows": 10000, //how many rows/batch; controls the number of batches/poll
Now if your source table has too much data which cannot be processed in 5 seconds (since you are not providing
poll.interval.ms), Connector could be sourcing in the data from previous poll and next poll may get triggered. Also, your batch rows is too less (1000) if source system data is too much (for sure on single thread, 1000 records per batch for even 50000 rows would need 50 batches to be complete on single thread (source fetch + network transfer + buffering + network trabsfer to kafka + Kafka Commit & acks (if acks are set)) you get the idea, kind of tricky. My understanding is that Kafka Connector commits to its offset topic after every batch, so if your first poll inst't complete and the second poll has started (which would take from committed offsets as starting point) you are bound to get duplicates.
Worst case, your job may go into an infinite loop (mine did) since there are chances that the first batch itself is not finished and the second poll started and then third and fourth and so on ... So in my opinion its the correct tuning of all the levers that would allow removing duplicates
(and I am not even going into connector crashing and cluster rebalancing scenario which pauses everything for all connectors; didn't liked it at all). So I would recommend to play around with different permutations of all these configurations especially,
poll.interval.ms and batch.max.rows based on your h/w processing power. And very well this is one of the many reasons why you have duplicate offsets as well (if even one batch of first poll is not completed and second poll has started; both will commit same offsets).
One thing IMHO to always remember, one table == 1 thread ( even if you set the max.tasks to 10). See how much your hardware can achieve on a single thread.
For your second issue; records missing is most probably not associated with the offset (mis)management. You will get duplicates with that. Records missing can easily happen when you have 2 or more records with same incrementingId column value and/or timestamp for multiple records are same. Consider this scenario:
INVOICE_ID INVOICE_TYPE UPDATE_TS
========== . ============ . ==========
1234 23 2019-03-06 13:44:58
1234 . 54 2019-03-06 13:44:58
2345 11 2019-03-06 13:44:58
4567 18 2019-03-06 13:44:58
Now imagine your connector processed the first record (assume that was the 1000th in the batch), committed the offset and started processing the second record above and suddenly it crashed or got restarted due to rebalance. Now when the connector will re-run the query it will skip record 2 as the offset would have the information that id -> 1234; ts -> 2019-03-06 13:44:58 and you query will become (the where condition will be added by connect):
WITH base AS (SELECT x, y, z FROM table WHERE col1 <> 3) SELECT * FROM base where update_ts < currentTimeinMillis AND ((update_ts = '2019-03-06 13:44:58' AND inc_id > 1234) OR update_ts > '2019-03-06 13:44:58') ORDER BY update_ts, inc_id desc
if your data is like this, then you can see that it can skip certain records, if the incrementing ID has repeating values. For Kafka Connect to function properly, the incrementing ID column needs to be forward incrementing as well as it needs to be unique (there can be only one incrementing column).
With dates there is some flexbility wherein multiple date columns can be provided (and the code does a COALASCE on that list and then compares with the timestamp) but still if the DB does not have millisecond level timestamp, there can be data getting missed (since multiple messages may get comitted at same timestamp and the update_ts is ot system generated timestamp)
For Socket close you may check with your DB Admin how much is the connection close time set and then you can try and play around with the batch and poll interval configurations to see how you can process everything within the connection closing time.
I hope this helps you to resolve your issues.