[2017-07-06 15:27:20,805] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [localhost:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
offset.flush.interval.ms = 10000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/connect.offsets
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
task.shutdown.graceful.timeout.ms = 5000
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2017-07-06 15:27:20,999] INFO Logging initialized @1900ms (org.eclipse.jetty.util.log:186)
[2017-07-06 15:27:21,378] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)
[2017-07-06 15:27:21,378] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:71)
[2017-07-06 15:27:21,379] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)
[2017-07-06 15:27:21,379] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-07-06 15:27:21,465] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2017-07-06 15:27:21,466] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
[2017-07-06 15:27:21,466] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-07-06 15:27:21,659] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Jul 06, 2017 3:27:22 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2017-07-06 15:27:22,997] INFO Started o.e.j.s.ServletContextHandler@60b71e8f{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-07-06 15:27:23,032] INFO Started ServerConnector@57e36f77{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-07-06 15:27:23,033] INFO Started @3939ms (org.eclipse.jetty.server.Server:379)
[2017-07-06 15:27:23,034] INFO REST server listening at http://172.17.0.8:8083/, advertising URL http://172.17.0.8:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-07-06 15:27:23,034] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2017-07-06 15:27:23,263] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-07-06 15:27:23,265] INFO Creating connector s3-sink of type io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
[2017-07-06 15:27:23,269] INFO Instantiated connector s3-sink with version 3.2.2 of type class io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
[2017-07-06 15:27:23,271] INFO S3SinkConnectorConfig values:
filename.offset.zero.pad.width = 10
flush.size = 1000
format.class = class io.confluent.connect.s3.format.json.JsonFormat
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
s3.bucket.name = my-bucket
s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.part.size = 104857600
s3.region = eu-west-1
s3.ssea.name = AES256
s3.wan.mode = false
schema.cache.size = 1000
shutdown.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-07-06 15:27:23,272] INFO StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url =
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-07-06 15:27:23,273] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-07-06 15:27:23,273] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-07-06 15:27:23,274] INFO Starting S3 connector s3-sink (io.confluent.connect.s3.S3SinkConnector:61)
[2017-07-06 15:27:23,276] INFO Finished creating connector s3-sink (org.apache.kafka.connect.runtime.Worker:194)
[2017-07-06 15:27:23,277] INFO SinkConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
topics = [my.topic]
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:180)
[2017-07-06 15:27:23,282] INFO Creating task s3-sink-0 (org.apache.kafka.connect.runtime.Worker:305)
[2017-07-06 15:27:23,282] INFO ConnectorConfig values:
connector.class = io.confluent.connect.s3.S3SinkConnector
key.converter = null
name = s3-sink
tasks.max = 1
transforms = null
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-07-06 15:27:23,284] INFO TaskConfig values:
task.class = class io.confluent.connect.s3.S3SinkTask
(org.apache.kafka.connect.runtime.TaskConfig:180)
[2017-07-06 15:27:23,285] INFO Instantiated task s3-sink-0 with version 3.2.2 of type io.confluent.connect.s3.S3SinkTask (org.apache.kafka.connect.runtime.Worker:317)
[2017-07-06 15:27:23,348] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-s3-sink
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-07-06 15:27:23,575] INFO Kafka version : 0.10.2.1-cp2 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-07-06 15:27:23,575] INFO Kafka commitId : 93cd3b2114e355cf (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-07-06 15:27:23,592] INFO Created connector s3-sink (org.apache.kafka.connect.cli.ConnectStandalone:90)
[2017-07-06 15:27:23,595] INFO S3SinkConnectorConfig values:
filename.offset.zero.pad.width = 10
flush.size = 1000
format.class = class io.confluent.connect.s3.format.json.JsonFormat
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
s3.bucket.name = my-bucket
s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
s3.part.size = 104857600
s3.region = eu-west-1
s3.ssea.name = AES256
s3.wan.mode = false
schema.cache.size = 1000
shutdown.timeout.ms = 3000
(io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-07-06 15:27:23,596] INFO StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.s3.storage.S3Storage
store.url =
topics.dir = topics
(io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-07-06 15:27:23,596] INFO HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
schema.compatibility = NONE
(io.confluent.connect.storage.hive.HiveConfig:180)
[2017-07-06 15:27:23,596] INFO PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name =
partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
path.format =
schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
(io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-07-06 15:27:24,935] INFO Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:103)
[2017-07-06 15:27:24,936] INFO Sink task WorkerSinkTask{id=s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232)
[2017-07-06 15:27:25,143] INFO Discovered coordinator ip-10-1-67-22.eu-west-1.compute.internal:9092 (id: 2147482645 rack: null) for group connect-s3-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:589)
[2017-07-06 15:27:25,151] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:399)
[2017-07-06 15:27:25,151] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:423)
[2017-07-06 15:27:25,169] INFO Successfully joined group connect-s3-sink with generation 49 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:391)
[2017-07-06 15:27:25,170] INFO Setting newly assigned partitions [my.topic-8, my.topic-7, my.topic-6, my.topic-5, my.topic-4, my.topic-3, my.topic-2, my.topic-1, my.topic-0, my.topic-9] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:258)
[2017-07-06 15:27:25,839] INFO Starting commit and rotation for topic partition my.topic-6 with start offset {partition=6=15000} (io.confluent.connect.s3.TopicPartitionWriter:200)
[2017-07-06 15:27:26,236] INFO Files committed to S3. Target commit offset for my.topic-6 is 16000 (io.confluent.connect.s3.TopicPartitionWriter:407)
[2017-07-06 15:27:26,385] INFO Starting commit and rotation for topic partition my.topic-3 with start offset {partition=3=15000} (io.confluent.connect.s3.TopicPartitionWriter:200)
[2017-07-06 15:27:26,671] INFO Files committed to S3. Target commit offset for my.topic-3 is 16000 (io.confluent.connect.s3.TopicPartitionWriter:407)
[2017-07-06 15:27:26,895] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at io.confluent.connect.s3.storage.S3OutputStream.<init>(S3OutputStream.java:67)
at io.confluent.connect.s3.storage.S3Storage.create(S3Storage.java:197)
at io.confluent.connect.s3.format.json.JsonRecordWriterProvider$1.<init>(JsonRecordWriterProvider.java:62)
at io.confluent.connect.s3.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:61)
at io.confluent.connect.s3.format.json.JsonRecordWriterProvider.getRecordWriter(JsonRecordWriterProvider.java:37)
at io.confluent.connect.s3.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:352)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:392)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:197)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-06 15:27:26,898] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2017-07-06 15:27:26,898] INFO WorkerSinkTask{id=s3-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-07-06 15:27:26,905] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-07-06 15:27:26,905] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-07-06 15:27:37,796] INFO Reflections took 16234 ms to scan 560 urls, producing 12996 keys and 85604 values (org.reflections.Reflections:229)
[2017-07-06 15:27:45,318] INFO 10.1.93.183 - - [06/Jul/2017:15:27:45 +0000] "GET / HTTP/1.1" 200 54 106 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2017-07-06 15:27:45,319] INFO 10.1.76.180 - - [06/Jul/2017:15:27:45 +0000] "GET / HTTP/1.1" 200 54 107 (org.apache.kafka.connect.runtime.rest.RestServer:60)
Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug?
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7f4f7f1d-176f-4119-9248-298cd7f3dfce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Recall that the S3 connector buffers messages for each topic partition and will write them to a file based upon the flush size. So, how much memory will be based upon the number of topic partitions, the flush size, the size of the messages, the JVM memory, the number of connectors you're running in the same worker, etc. Try changing the flush size, increasing the JVM's memory, or adding more Kafka Connect workers so that each worker is running on a single task.Best regards,Randall
On Fri, Jul 7, 2017 at 4:08 AM, Slavo Nagy <nagy...@gmail.com> wrote:
Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug?
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7f4f7f1d-176f-4119-9248-298cd7f3dfce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CALYgK0GhFFshEx4UEosB8kMj4WpjdQvWWRhCq9p1ocegAXzHJw%40mail.gmail.com.
Indeed, heap memory demand scales with the number of concurrently outstanding output partitions (or files if you prefer) in S3. There's a distinction between output partitions and Kafka (input) partitions because depending on your partitioning scheme the connector might disseminate the data of a Kafka partition into more than one files (e.g. with field partitioning).However, no specific number of partitions is accurate as a limit. As Randall noted, whether you exhaust the heap memory assigned to a specific worker depends on multiple factors.Given that flush.size is an "application" parameter, meaning that it determines the actual size of your files in S3, the parameter that you want to use to tune and try to dial down the memory pressure on the worker ispart.sizewhich is a "system" parameter and doesn't affect the final size of your files. It has a minimum size limit of 5MB. It might affect the throughput, but in this case the trade-off is probably better than getting OOM.Note that the default maximum heap size allocated to the Connect worker is pretty conservative (256MB). Probably it makes sense to increase it in all the workers that will run S3 connector tasks.-- Konstantine
On Fri, Jul 7, 2017 at 8:50 AM, Randall Hauch <rha...@gmail.com> wrote:
Recall that the S3 connector buffers messages for each topic partition and will write them to a file based upon the flush size. So, how much memory will be based upon the number of topic partitions, the flush size, the size of the messages, the JVM memory, the number of connectors you're running in the same worker, etc. Try changing the flush size, increasing the JVM's memory, or adding more Kafka Connect workers so that each worker is running on a single task.Best regards,Randall--On Fri, Jul 7, 2017 at 4:08 AM, Slavo Nagy <nagy...@gmail.com> wrote:Surprisingly, this seems to be related to number of partitions in the topic. Going beyond 8 leads to the OOM. Is this a bug?--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7f4f7f1d-176f-4119-9248-298cd7f3dfce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
Thanks for your hints, managed to get this running by increasing the heap space to -Xms1024m -Xmx2048m.
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"partition.duration.ms": "300000",
"s3.region": "us-east-1",
"topics.dir": "data/live",
"schema.compatibility": "NONE",
"flush.size": "50000",
"topics": "commerce.activity.subscription",
"timezone": "UTC",
"tasks.max": "16",
"s3.part.size": "5242880",
"locale": "US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "event-s3-sink.commerce.activity.subscription",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "peopleconnect-prod-datalake-ldz",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH'00'",
"rotate.schedule.interval.ms": "300000"
}
Thanks in advance.
Niket