Kafka Connect - S3 Connector; creating many small files and significantly duplicating data?

1,285 views
Skip to first unread message

Will Briggs

unread,
Jul 27, 2017, 3:40:57 PM7/27/17
to Confluent Platform
I'm seeing behavior that I don't necessarily expect with the Kafka Connect S3 connector. I have a topic w/ 64 partitions, and am using the attached worker and connector configurations to write data from these partitions into S3. The intent is to roll the files hourly, ideally ending up with 64 files per hour. As you can likely tell from the naming convention, I'm looking to use Amazon Athena to query this data. My first challenge was related to the number of partitions and the s3.part.size parameter default; after setting that to the minimum (5MB), I haven't had any further issues with running out of memory.

However, I am seeing two different problem behaviors - the first is that occasionally, the workers appear to get into a state where they are constantly rebalancing, and the offsets don't appear to be committed (seeing messages like "WARN Commit of WorkerSinkTask{id=pixall-parsed-non-prod-2} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)", which implies I may be hitting https://issues.apache.org/jira/browse/KAFKA-4942). This ends up creating significant problems, because when the rebalance starts over, it starts from somewhere much further back in time, and ends up writing large amounts of duplicate data from previous hours into the bucket for the current wall clock hour. I commented on some changes I made in the worker-config.properties file to try and address this (increased heartbeat.interval.ms, session.timeout.ms, and offset.flush.timeout.ms, and decreased offset.flush.interval.ms), but the problem is still happening intermittently.
 
In addition, some of the workers will occasionally create hundreds (or even thousands) of files in the S3 bucket, each containing a single Avro record.  Below are a couple sample log lines to show what this looks like when it occurs:

[2017-07-27 19:02:24,018] INFO Opening record writer for: my_bucket/year=2017/month=07/day=27/hour=19/my_topic+53+0007467933.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-07-27 19:02:24,470] INFO Files committed to S3. Target commit offset for my_topic-53 is 7467933 (io.confluent.connect.s3.TopicPartitionWriter:407)

[2017-07-27 19:02:24,471] INFO Opening record writer for: my_bucket/year=2017/month=07/day=27/hour=19/my_topic+53+0007467934.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

... (hundreds more, each with the offset incremented by one)


I've definitely run into a wall, and would appreciate any help or support this group can provide!

Regards,
Will
worker-config.properties
connector-config.json

Ewen Cheslack-Postava

unread,
Jul 31, 2017, 12:11:09 PM7/31/17
to Confluent Platform
On Thu, Jul 27, 2017 at 12:40 PM, Will Briggs <wrbr...@gmail.com> wrote:
I'm seeing behavior that I don't necessarily expect with the Kafka Connect S3 connector. I have a topic w/ 64 partitions, and am using the attached worker and connector configurations to write data from these partitions into S3. The intent is to roll the files hourly, ideally ending up with 64 files per hour. As you can likely tell from the naming convention, I'm looking to use Amazon Athena to query this data. My first challenge was related to the number of partitions and the s3.part.size parameter default; after setting that to the minimum (5MB), I haven't had any further issues with running out of memory.

However, I am seeing two different problem behaviors - the first is that occasionally, the workers appear to get into a state where they are constantly rebalancing, and the offsets don't appear to be committed (seeing messages like "WARN Commit of WorkerSinkTask{id=pixall-parsed-non-prod-2} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)", which implies I may be hitting https://issues.apache.org/jira/browse/KAFKA-4942). This ends up creating significant problems, because when the rebalance starts over, it starts from somewhere much further back in time, and ends up writing large amounts of duplicate data from previous hours into the bucket for the current wall clock hour. I commented on some changes I made in the worker-config.properties file to try and address this (increased heartbeat.interval.ms, session.timeout.ms, and offset.flush.timeout.ms, and decreased offset.flush.interval.ms), but the problem is still happening intermittently.

"commit" is a bit overloaded here, but it looks like there are potentially 2 problems here. KAFKA-4942 is not actually related to rebalancing beyond the fact that rebalancing results in reloading offsets. However, note that KAFKA-4942 shouldn't actually result in *not* committing offsets, it was just logging that the timeout had been hit. The request to commit offsets should still have been sent and processed. That particular bug caused an annoying amount of log spam, but should still result in offset commits.

One thing you can do to verify this is to use the consumer offsets checker while the connector is running to make sure you see the committed offsets increasing periodically.

 
In addition, some of the workers will occasionally create hundreds (or even thousands) of files in the S3 bucket, each containing a single Avro record.  Below are a couple sample log lines to show what this looks like when it occurs:

[2017-07-27 19:02:24,018] INFO Opening record writer for: my_bucket/year=2017/month=07/day=27/hour=19/my_topic+53+0007467933.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-07-27 19:02:24,470] INFO Files committed to S3. Target commit offset for my_topic-53 is 7467933 (io.confluent.connect.s3.TopicPartitionWriter:407)

[2017-07-27 19:02:24,471] INFO Opening record writer for: my_bucket/year=2017/month=07/day=27/hour=19/my_topic+53+0007467934.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

... (hundreds more, each with the offset incremented by one)



Have you had any schema changes, or do you mix schemas ever? You have schema.compatibility set to BACKWARD, which will allow for backward compatible schemas to be projected, but the most common cause we see for small files are schema changes that do not allow for projection such that messages with the different schemas that are interleaved requires rolling new files.

-Ewen

 
I've definitely run into a wall, and would appreciate any help or support this group can provide!

Regards,
Will

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/de4c890b-e31d-4c12-9412-1b32a20e0abc%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages