Hi all,
we are using a Kafka connect source connector to fetch files from a FTP server and put each data record into a Kafka topic.
Currently we are facing a problem when the file fetched from FTP server is that big, that it can't be pushed within 5 minutes to the Kafka topic. In this case the source connector continues with the next file anyway, resulting in an error "OffsetStorageWriter is already flushing".
If the file is small enough, then the file can be pushed within 5 minutes an no error occurs.
Unfortunately I didn't found/know the configuration parameter to increase this timeout to let's say 10 minutes.
Any help to find that parameter or get a better understanding of timeouts for "WorkerSourceTask" is highly appreciated.
The Kafka connect version we're using is "confluentinc/cp-kafka-connect:3.1.1".
Below you see an excerpt from the log for fetching an pushing two big files.
[2018-05-31 02:32:24,610] INFO meta store storage HASN'T /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore)
[2018-05-31 02:32:24,610] INFO fetching /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:32:38,546] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:32:38,832] INFO fetched /home/my-big-zip-file.csv.zip, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:32:38,832] INFO dump entire /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:32:38,832] INFO got some fileChanges: /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller)
[2018-05-31 02:32:38,832] INFO ConnectFileMetaDataStore set /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore)
[2018-05-31 02:32:38,832] INFO start converting (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:38,832] INFO convert records (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:55,162] INFO number of records: 6088272 (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:55,162] INFO return converted records (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:55,681] INFO WorkerSourceTask{id=FtpSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:32:55,681] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 23913 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:32:56,802] INFO WorkerSourceTask{id=FtpSourceConnector-0} Finished commitOffsets successfully in 1121 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
...
[2018-05-31 02:37:47,822] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 23747 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:48,992] INFO WorkerSourceTask{id=FtpSourceConnector-0} Finished commitOffsets successfully in 1170 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:49,992] INFO WorkerSourceTask{id=FtpSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:49,992] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 23883 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:50,994] INFO meta store storage HASN'T /home/my-second-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore)
[2018-05-31 02:37:50,994] INFO fetching /home/my-second-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:37:50,995] INFO WorkerSourceTask{id=FtpSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:50,995] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2018-05-31 02:37:50,996] ERROR WorkerSourceTask{id=FtpSourceConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:197)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Best regards,
Martin