Kafka source connector - how to increase timeout between two files when flushing to a Kafka topic

35 views
Skip to first unread message

martin....@flixbus.com

unread,
May 31, 2018, 4:26:35 AM5/31/18
to Confluent Platform
Hi all,

we are using a Kafka connect source connector to fetch files from a FTP server and put each data record into a Kafka topic.

Currently we are facing a problem when the file fetched from FTP server is that big, that it can't be pushed within 5 minutes to the Kafka topic. In this case the source connector continues with the next file anyway, resulting in an error "OffsetStorageWriter is already flushing".
If the file is small enough, then the file can be pushed within 5 minutes an no error occurs.

Unfortunately I didn't found/know the configuration parameter to increase this timeout to let's say 10 minutes.

Any help to find that parameter or get a better understanding of timeouts for "WorkerSourceTask" is highly appreciated.

The Kafka connect version we're using is "confluentinc/cp-kafka-connect:3.1.1".

Below you see an excerpt from the log for fetching an pushing two big files.

[2018-05-31 02:32:24,610] INFO meta store storage HASN'T /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore)
[2018-05-31 02:32:24,610] INFO fetching /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:32:38,546] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:32:38,832] INFO fetched /home/my-big-zip-file.csv.zip, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:32:38,832] INFO dump entire /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:32:38,832] INFO got some fileChanges: /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller)
[2018-05-31 02:32:38,832] INFO ConnectFileMetaDataStore set /home/my-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore)
[2018-05-31 02:32:38,832] INFO start converting (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:38,832] INFO convert records (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:55,162] INFO number of records: 6088272 (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:55,162] INFO return converted records (com.landoop.SingleChannelCSV)
[2018-05-31 02:32:55,681] INFO WorkerSourceTask{id=FtpSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:32:55,681] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 23913 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:32:56,802] INFO WorkerSourceTask{id=FtpSourceConnector-0} Finished commitOffsets successfully in 1121 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
...
[2018-05-31 02:37:47,822] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 23747 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:48,992] INFO WorkerSourceTask{id=FtpSourceConnector-0} Finished commitOffsets successfully in 1170 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:49,992] INFO WorkerSourceTask{id=FtpSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:49,992] INFO WorkerSourceTask{id=FtpSourceConnector-0} flushing 23883 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:50,994] INFO meta store storage HASN'T /home/my-second-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore)
[2018-05-31 02:37:50,994] INFO fetching /home/my-second-big-zip-file.csv.zip (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor)
[2018-05-31 02:37:50,995] INFO WorkerSourceTask{id=FtpSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-05-31 02:37:50,995] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2018-05-31 02:37:50,996] ERROR WorkerSourceTask{id=FtpSourceConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:318)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:197)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Best regards,
Martin

Konstantine Karantasis

unread,
Jun 4, 2018, 11:49:19 AM6/4/18
to confluent...@googlegroups.com

Hey Martin,

I'm not familiar with the specific connector, but try adjusting the property: offset.flush.interval.ms in the Connect worker's configuration. 
Should increase the interval between consecutive attempts to commit offsets by the connector. Current default: 1minute (60000L ms)

Konstantine

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5171a0d3-6bcf-497a-84a9-e0b4d9797dbf%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages