Kafka Connect S3: Out of memory: Java heap space

5,590 views
Skip to first unread message

Slavo Nagy

unread,
Jul 6, 2017, 11:44:23 AM7/6/17
to Confluent Platform
Hi,

I'm running Kafka Connect S3 inside docker container in AWS ECS in standalone mode and getting Java.lang.OutOfMemoryError: Java heap space.

Already using KAFKA_HEAP_OPTS in environment variables with '-Xms256m -Xmx1g' which should be sufficient for message and chunk size in my topic.

Any hints?

The config settings can be seen below in the logs.

Thanks in advance.

Regards,
Slavo


[2017-07-06 15:27:20,805] INFO StandaloneConfig values:
access
.control.allow.methods =
access
.control.allow.origin =
bootstrap
.servers = [localhost:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key
.converter = class org.apache.kafka.connect.json.JsonConverter
offset
.flush.interval.ms = 10000
offset
.flush.timeout.ms = 5000
offset
.storage.file.filename = /tmp/connect.offsets
rest
.advertised.host.name = null
rest
.advertised.port = null
rest
.host.name = null
rest
.port = 8083
task
.shutdown.graceful.timeout.ms = 5000
value
.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2017-07-06 15:27:20,999] INFO Logging initialized @1900ms (org.eclipse.jetty.util.log:186)
[2017-07-06 15:27:21,378] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)
[2017-07-06 15:27:21,378] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:71)
[2017-07-06 15:27:21,379] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)
[2017-07-06 15:27:21,379] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-07-06 15:27:21,465] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2017-07-06 15:27:21,466] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
[2017-07-06 15:27:21,466] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-07-06 15:27:21,659] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Jul 06, 2017 3:27:22 PM org.glassfish.jersey.internal.Errors logErrors
WARNING
: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING
: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING
: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING
: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2017-07-06 15:27:22,997] INFO Started o.e.j.s.ServletContextHandler@60b71e8f{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-07-06 15:27:23,032] INFO Started ServerConnector@57e36f77{HTTP/
1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-07-06 15:27:23,033] INFO Started @3939ms (org.eclipse.jetty.server.Server:379)
[2017-07-06 15:27:23,034] INFO REST server listening at http://172.17.0.8:8083/, advertising URL http://172.17.0.8:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-07-06 15:27:23,034] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2017-07-06 15:27:23,263] INFO ConnectorConfig values:
connector
.class = io.confluent.connect.s3.S3SinkConnector
key
.converter = null
name
= s3-sink
tasks
.max = 1
transforms
= null
value
.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-07-06 15:27:23,265] INFO Creating connector s3-sink of type io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
[2017-07-06 15:27:23,269] INFO Instantiated connector s3-sink with version 3.2.2 of type class io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
[2017-07-06 15:27:23,271] INFO S3SinkConnectorConfig values:
filename
.offset.zero.pad.width = 10
flush
.size = 1000
format
.class = class io.confluent.connect.s3.format.json.JsonFormat
retry.backoff.ms = 5000
rotate
.interval.ms = -1
rotate
.schedule.interval.ms = -1
s3
.bucket.name = my-bucket
s3
.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3
.part.size = 104857600
s3
.region = eu-west-1
s3
.ssea.name = AES256
s3
.wan.mode = false
schema
.cache.size = 1000
shutdown
.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-07-06 15:27:23,272] INFO StorageCommonConfig values:
directory
.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url =
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-07-06 15:27:23,273] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-07-06 15:27:23,273] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-07-06 15:27:23,274] INFO Starting S3 connector s3-sink (io.confluent.connect.s3.S3SinkConnector:61)
[2017-07-06 15:27:23,276] INFO Finished creating connector s3-sink (org.apache.kafka.connect.runtime.Worker:194)
[2017-07-06 15:27:23,277] INFO SinkConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
topics = [my.topic]
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:180)
[2017-07-06 15:27:23,282] INFO Creating task s3-sink-0 (org.apache.kafka.connect.runtime.Worker:305)
[2017-07-06 15:27:23,282] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-07-06 15:27:23,284] INFO TaskConfig values:
task.class = class io.confluent.connect.s3.S3SinkTask
(org.apache.kafka.connect.runtime.TaskConfig:180)
[2017-07-06 15:27:23,285] INFO Instantiated task s3-sink-0 with version 3.2.2 of type io.confluent.connect.s3.S3SinkTask (org.apache.kafka.connect.runtime.Worker:317)
[2017-07-06 15:27:23,348] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-s3-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /
usr/bin/kinit
sasl
.kerberos.min.time.before.relogin = 60000
sasl
.kerberos.service.name = null
sasl
.kerberos.ticket.renew.jitter = 0.05
sasl
.kerberos.ticket.renew.window.factor = 0.8
sasl
.mechanism = GSSAPI
security
.protocol = PLAINTEXT
send
.buffer.bytes = 131072
session
.timeout.ms = 10000
ssl
.cipher.suites = null
ssl
.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl
.endpoint.identification.algorithm = null
ssl
.key.password = null
ssl
.keymanager.algorithm = SunX509
ssl
.keystore.location = null
ssl
.keystore.password = null
ssl
.keystore.type = JKS
ssl
.protocol = TLS
ssl
.provider = null
ssl
.secure.random.implementation = null
ssl
.trustmanager.algorithm = PKIX
ssl
.truststore.location = null
ssl
.truststore.password = null
ssl
.truststore.type = JKS
value
.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-07-06 15:27:23,575] INFO Kafka version : 0.10.2.1-cp2 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-07-06 15:27:23,575] INFO Kafka commitId : 93cd3b2114e355cf (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-07-06 15:27:23,592] INFO Created connector s3-sink (org.apache.kafka.connect.cli.ConnectStandalone:90)
[2017-07-06 15:27:23,595] INFO S3SinkConnectorConfig values:
filename
.offset.zero.pad.width = 10
flush
.size = 1000
format
.class = class io.confluent.connect.s3.format.json.JsonFormat
retry.backoff.ms = 5000
rotate
.interval.ms = -1
rotate
.schedule.interval.ms = -1
s3
.bucket.name = my-bucket
s3
.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3
.part.size = 104857600
s3
.region = eu-west-1
s3
.ssea.name = AES256
s3
.wan.mode = false
schema
.cache.size = 1000
shutdown
.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-07-06 15:27:23,596] INFO StorageCommonConfig values:
directory
.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url =
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-07-06 15:27:23,596] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-07-06 15:27:23,596] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-07-06 15:27:24,935] INFO Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:103)
[2017-07-06 15:27:24,936] INFO Sink task WorkerSinkTask{id=s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232)
[2017-07-06 15:27:25,143] INFO Discovered coordinator ip-10-1-67-22.eu-west-1.compute.internal:9092 (id: 2147482645 rack: null) for group connect-s3-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:589)
[2017-07-06 15:27:25,151] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:399)
[2017-07-06 15:27:25,151] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:423)
[2017-07-06 15:27:25,169] INFO Successfully joined group connect-s3-sink with generation 49 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:391)
[2017-07-06 15:27:25,170] INFO Setting newly assigned partitions [my.topic-8, my.topic-7, my.topic-6, my.topic-5, my.topic-4, my.topic-3, my.topic-2, my.topic-1, my.topic-0, my.topic-9] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:258)
[2017-07-06 15:27:25,839] INFO Starting commit and rotation for topic partition my.topic-6 with start offset {partition=6=15000} (io.confluent.connect.s3.TopicPartitionWriter:200)
[2017-07-06 15:27:26,236] INFO Files committed to S3. Target commit offset for my.topic-6 is 16000 (io.confluent.connect.s3.TopicPartitionWriter:407)
[2017-07-06 15:27:26,385] INFO Starting commit and rotation for topic partition my.topic-3 with start offset {partition=3=15000} (io.confluent.connect.s3.TopicPartitionWriter:200)
[2017-07-06 15:27:26,671] INFO Files committed to S3. Target commit offset for my.topic-3 is 16000 (io.confluent.connect.s3.TopicPartitionWriter:407)
[2017-07-06 15:27:26,895] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at io.confluent.connect.s3.storage.S3OutputStream.<init>(S3OutputStream.java:67)
at io.confluent.connect.s3.storage.S3Storage.create(S3Storage.java:197)
at io.confluent.connect.s3.format.json.JsonRecordWriterProvider$1.<init>(JsonRecordWriterProvider.java:62)
at io.confluent.connect.s3.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:61)
at io.confluent.connect.s3.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:37)
at io.confluent.connect.s3.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:352)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:392)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:197)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-06 15:27:26,898] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2017-07-06 15:27:26,898] INFO WorkerSinkTask{id=s3-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-07-06 15:27:26,905] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-06 15:27:26,905] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-07-06 15:27:37,796] INFO Reflections took 16234 ms to scan 560 urls, producing 12996 keys and 85604 values (org.reflections.Reflections:229)
[2017-07-06 15:27:45,318] INFO 10.1.93.183 - - [06/Jul/2017:15:27:45 +0000] "GET /
HTTP/1.1" 200 54 106 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2017-07-06 15:27:45,319] INFO 10.1.76.180 - - [06/Jul/2017:15:27:45 +0000] "
GET / HTTP/1.1" 200 54 107 (org.apache.kafka.connect.runtime.rest.RestServer:60)

Slavo Nagy

unread,
Jul 7, 2017, 5:08:21 AM7/7/17
to Confluent Platform
Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug? 

Randall Hauch

unread,
Jul 7, 2017, 11:50:59 AM7/7/17
to confluent...@googlegroups.com
Recall that the S3 connector buffers messages for each topic partition and will write them to a file based upon the flush size. So, how much memory will be based upon the number of topic partitions, the flush size, the size of the messages, the JVM memory, the number of connectors you're running in the same worker, etc. Try changing the flush size, increasing the JVM's memory, or adding more Kafka Connect workers so that each worker is running on a single task.

Best regards,

Randall

On Fri, Jul 7, 2017 at 4:08 AM, Slavo Nagy <nagy...@gmail.com> wrote:
Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug? 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7f4f7f1d-176f-4119-9248-298cd7f3dfce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Konstantine Karantasis

unread,
Jul 7, 2017, 12:15:31 PM7/7/17
to confluent...@googlegroups.com

Indeed, heap memory demand scales with the number of concurrently outstanding output partitions (or files if you prefer) in S3. There's a distinction between output partitions and Kafka (input) partitions because depending on your partitioning scheme the connector might disseminate the data of a Kafka partition into more than one files (e.g. with field partitioning). 

However, no specific number of partitions is accurate as a limit. As Randall noted, whether you exhaust the heap memory assigned to a specific worker depends on multiple factors. 

Given that flush.size is an "application" parameter, meaning that it determines the actual size of your files in S3, the parameter that you want to use to tune and try to dial down the memory pressure on the worker is 

part.size

which is a "system" parameter and doesn't affect the final size of your files. It has a minimum size limit of 5MB. It might affect the throughput, but in this case the trade-off is probably better than getting OOM. 

Note that the default maximum heap size allocated to the Connect worker is pretty conservative (256MB). Probably it makes sense to increase it in all the workers that will run S3 connector tasks.

-- Konstantine


On Fri, Jul 7, 2017 at 8:50 AM, Randall Hauch <rha...@gmail.com> wrote:
Recall that the S3 connector buffers messages for each topic partition and will write them to a file based upon the flush size. So, how much memory will be based upon the number of topic partitions, the flush size, the size of the messages, the JVM memory, the number of connectors you're running in the same worker, etc. Try changing the flush size, increasing the JVM's memory, or adding more Kafka Connect workers so that each worker is running on a single task.

Best regards,

Randall
On Fri, Jul 7, 2017 at 4:08 AM, Slavo Nagy <nagy...@gmail.com> wrote:
Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug? 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Konstantine Karantasis

unread,
Jul 7, 2017, 12:20:25 PM7/7/17
to confluent...@googlegroups.com
I forgot to paste a link with the list of configuration parameters in the docs. 


Also, the exact name of the configuration property mentioned above is: 

s3.part.size

Thanks,
Konstantine

On Fri, Jul 7, 2017 at 9:15 AM, Konstantine Karantasis <konst...@confluent.io> wrote:

Indeed, heap memory demand scales with the number of concurrently outstanding output partitions (or files if you prefer) in S3. There's a distinction between output partitions and Kafka (input) partitions because depending on your partitioning scheme the connector might disseminate the data of a Kafka partition into more than one files (e.g. with field partitioning). 

However, no specific number of partitions is accurate as a limit. As Randall noted, whether you exhaust the heap memory assigned to a specific worker depends on multiple factors. 

Given that flush.size is an "application" parameter, meaning that it determines the actual size of your files in S3, the parameter that you want to use to tune and try to dial down the memory pressure on the worker is 

part.size

which is a "system" parameter and doesn't affect the final size of your files. It has a minimum size limit of 5MB. It might affect the throughput, but in this case the trade-off is probably better than getting OOM. 

Note that the default maximum heap size allocated to the Connect worker is pretty conservative (256MB). Probably it makes sense to increase it in all the workers that will run S3 connector tasks.

-- Konstantine

On Fri, Jul 7, 2017 at 8:50 AM, Randall Hauch <rha...@gmail.com> wrote:
Recall that the S3 connector buffers messages for each topic partition and will write them to a file based upon the flush size. So, how much memory will be based upon the number of topic partitions, the flush size, the size of the messages, the JVM memory, the number of connectors you're running in the same worker, etc. Try changing the flush size, increasing the JVM's memory, or adding more Kafka Connect workers so that each worker is running on a single task.

Best regards,

Randall

On Fri, Jul 7, 2017 at 4:08 AM, Slavo Nagy <nagy...@gmail.com> wrote:
Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug? 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7f4f7f1d-176f-4119-9248-298cd7f3dfce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Slavo Nagy

unread,
Jul 12, 2017, 8:53:37 AM7/12/17
to Confluent Platform
Thanks for your hints, managed to get this running by increasing the heap space to -Xms1024m -Xmx2048m.
Did not expect it to have such high memory requirements, as the data sizes were in single MB range in total. 

Cheers,
Slavo

Niket Anand

unread,
Feb 8, 2020, 4:44:03 PM2/8/20
to Confluent Platform
Hi,

I am trying to analyze the heap size for the s3-sink connector.
We have 2 K8s pods (replica) running with 6 registered different connectors. Each connector is pointing to a different topic, each with 16 partitions. 
As we have tasks.max =16, I guess each pod would have 8 tasks (assuming the distribution of tasks among pods)running in parallel.
I am seeing that the heap size of that container is always ~7GB in stage (not much data coming) and ~12GB in prod (having the same config).
I am not sure how come it is deciding to take fix constant such high memory
My understanding was each pod should take
(5MB(s3 part size) + ~2MB(consumer.max.partition.fetch.bytes)) * 8(partitions) * 6(connectors) = 340MB
But seeing container is taking 7GB memory.

I am seeing it is flushing every 5 min (rotate.schedule.interval.ms). We don't have much data coming so seeing max 10KB (around 10 records) files getting flushed every 5 min.

Below is the configuration

{

  "connector.class""io.confluent.connect.s3.S3SinkConnector",

  "partition.duration.ms""300000",

  "s3.region""us-east-1",

  "topics.dir""data/live",

  "schema.compatibility""NONE",

  "flush.size""50000",

  "topics""commerce.activity.subscription",

  "timezone""UTC",

  "tasks.max""16",

  "s3.part.size""5242880",

  "locale""US",

  "format.class""io.confluent.connect.s3.format.json.JsonFormat",

  "partitioner.class""io.confluent.connect.storage.partitioner.TimeBasedPartitioner",

  "name""event-s3-sink.commerce.activity.subscription",

  "storage.class""io.confluent.connect.s3.storage.S3Storage",

  "s3.bucket.name""peopleconnect-prod-datalake-ldz",

  "path.format""'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH'00'",

  "rotate.schedule.interval.ms""300000"

}


Thanks in advance.

Niket

Reply all
Reply to author
Forward
0 new messages