S3 Connector Optimizations

1,044 views
Skip to first unread message

jeff whybark

unread,
Jun 5, 2017, 6:44:50 PM6/5/17
to Confluent Platform
Hello,
We are currently running a POC on the Kafka S3 Connector.  The movement of data to S3 is working well, however we believe that it is relatively slow based on what we anticipated.  We are looking for some tips and tricks to determine where the bottleneck might be.  We are porting across about 75,000 rows per minute totalling about 150 MB of data.  (Kansas City Data Center -> us-east-1)  I know there are lots of factors, but just curious what other folks throughput experience has been.

We are on Kafka Connect version 3.2.1.  We currently have a single worker, but are looking to expand to two very soon.  Our flush.size is 100,000 with 2 tasks on our S3 connector.

Thanks for any help,
Jeff


Konstantine Karantasis

unread,
Jun 6, 2017, 12:59:40 AM6/6/17
to confluent...@googlegroups.com
Hi Jeff, 

what is the value of your s3.part.size property?
Also, you mention you deploy 2 tasks on a single worker. Can you say approximately how many kafka partitions do you expect to export to S3?

Thanks,
Konstantine

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/78324b49-1fdf-41f2-8dc8-7a3b49e205fb%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

jeff whybark

unread,
Jun 6, 2017, 10:19:59 AM6/6/17
to Confluent Platform
Thanks for responding.  s3.part.size has been left at the default 
104857600.  Currently we are outputting to around 10 partitions, but it can be as high as 30.  It just depends on the data.   However it seems like 10 is more standard.
Thanks,
Jeff 

Konstantine Karantasis

unread,
Jun 6, 2017, 3:12:48 PM6/6/17
to confluent...@googlegroups.com

First, I'm happy to hear that the connector runs without issues at this part.size (100MB) with this number of partitions (this also depends on the max heap size you can afford to allocate to your VM). 

Now, currently, the S3 connector supports one level of parallelism, and that is across tasks. With 2 tasks and let's say 10 partitions, you normally have 5 partitions per task. Increasing the number of tasks will increase your parallelism at this level (also adding more workers and spreading those task among those workers on different nodes will help too). I don't expect increasing your number of tasks to put much higher memory pressure to your JVM, because the heap memory demands depend more on the number topic-partitions in-flight every time, rather than the number of worker tasks. Since with your current settings you can serve that many partitions concurrently, increasing the number of tasks shouldn't be a big problem.

In one of the next releases we'd like to enable parallelism at the part level as well (S3 connector uses multi-part uploads to upload data to S3). This is not yet in place. So currently there's a single level of parallelism we can exploit (that of partitions per task).

Finally, your s3.part.size seems high enough to support higher throughput, but you might want to try various sizes. The actual part.size depends on the size of your records too (100K records * record size = ?). 

Let me note here, that we changed the default s3.part.size to 25MB, but we did that only to have smoother deployments with the current default max heap size of connect workers (which is still 256MB for now). Ideally you want to bump JVM heap size, and possibly s3.part.size, depending on your expected load. 

So, I'd start by deploying more tasks. Measuring your throughput from your data center to us-east-1 should give you an upper bound too. Also you might want to check whether part uploads fail, and retransmissions occur. 

Konstantine


Jeff 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

jeff whybark

unread,
Jun 6, 2017, 6:07:24 PM6/6/17
to Confluent Platform
To post to this group, send email to confluent...@googlegroups.com.


Thank you very much Konstantine.  We will give some of these a shot and let you know how it goes.

jeff whybark

unread,
Jun 7, 2017, 6:19:25 PM6/7/17
to Confluent Platform
Once again, thanks Konstantine.  Certainly lots of variables.  We did increase our tasks to 10, but are having difficulty determining if the tasks truly increased the parallelism as our load times haven't changed much.  Is the expectation that after the 100,000 records (flush.size) are divided up into the partitions (about 6 or 7), the individual tasks then run in parallel to load the data into S3?  BTW, from AWS CLI it looks like we are getting around 4MB/sec on a standard file copy.  As for s3 connector, we are fortunate to get 1MB/sec.  I am not seeing any error messages concerning uploads failures, and retransmissions occur.
This is what we are seeing for about 60MB worth of data:

[2017-06-07 16:29:37,051] INFO Opening record writer for: the_topic/the_topic/the_partition=XWS/the_topic+0+0000120920000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:37,056] INFO WorkerSinkTask{id=s3-sink-lin-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)

[2017-06-07 16:29:37,056] WARN Commit of WorkerSinkTask{id=s3-sink-lin-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)

[2017-06-07 16:29:37,404] INFO Opening record writer for: the_topic/the_topic/the_partition=XWO/the_topic+0+0000120928433.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:37,423] INFO Opening record writer for: the_topic/the_topic/the_partition=BZA/the_topic+0+0000120928773.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:37,546] INFO Opening record writer for: the_topic/the_topic/the_partition=BZS/the_topic+0+0000120932021.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:37,897] INFO Opening record writer for: the_topic/the_topic/the_partition=XWW/the_topic+0+0000120941077.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:39,133] INFO Opening record writer for: the_topic/the_topic/the_partition=XWA/the_topic+0+0000120973464.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:39,801] INFO Opening record writer for: the_topic/the_topic/the_partition=BZO/the_topic+0+0000120990635.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)

[2017-06-07 16:29:40,915] INFO Starting commit and rotation for topic partition the_topic-0 with start offset {the_partition=XWA=120973464, the_partition=XWO=120928433, the_partition=BZO=120990635, the_partition=XWS=120920000, the_partition=BZS=120932021, the_partition=BZA=120928773, the_partition=XWW=120941077} (io.confluent.connect.s3.TopicPartitionWriter:194)

[2017-06-07 16:30:38,958] INFO Files committed to S3. Target commit offset for the_topic-0 is 121020000 (io.confluent.connect.s3.TopicPartitionWriter:341)


How do we tell if the tasks are truly getting utilized and running in parallel?


Also, we did try an put out another worker, however we ran into two problems.  BTW, this is two workers on the same node.  Unfortunately we aren't in a position to have a clustered setup at this time.  The first problem we encountered is that the same s3 connector was running on both workers and attempting to pull the same offsets at the same time therefore duplicating the data in s3.  Generally one of the workers would fail and then both would advance to the next offset.  We stopped and deleted the second worker at this point.  We then tried to add a second worker again.  This time the second worker just sat idle.  We weren't sure when work is distributed to multiple workers, is it round robin?  Or is the second worker only utilized when there is significant stain on the first worker?


Thanks again for all your help,

Jeff

Reply all
Reply to author
Forward
0 new messages