Transferring log data from kafka to S3

841 views
Skip to first unread message

prashanth appu

unread,
May 31, 2017, 8:12:07 AM5/31/17
to Confluent Platform
HI,

How can i transfer log data of kafka to S3 usign kafka connect

Thanks and regards

Randall Hauch

unread,
May 31, 2017, 12:56:21 PM5/31/17
to confluent...@googlegroups.com

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/33aee803-7749-46a3-a067-4cb8d72a7d18%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

prashanth appu

unread,
Jun 1, 2017, 12:41:51 AM6/1/17
to Confluent Platform
Hi Randall....

Yes i had followed those steps but getting the following error
cdc@standbynode:~/Desktop/confluent-3.2.1$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-s3/quickstart-s3.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/cdc/Desktop/confluent-3.2.1/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/cdc/Desktop/confluent-3.2.1/share/java/kafka-connect-elasticsearch/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/cdc/Desktop/confluent-3.2.1/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/cdc/Desktop/confluent-3.2.1/share/java/kafka-connect-s3/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/cdc/Desktop/confluent-3.2.1/share/java/kafka-connect-storage-common/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/cdc/Desktop/confluent-3.2.1/share/java/kafka/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-05-31 19:05:53,508] INFO StandaloneConfig values:
    access.control.allow.methods =
    access.control.allow.origin =
    bootstrap.servers = [localhost:9092]
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class io.confluent.connect.avro.AvroConverter
    offset.flush.interval.ms = 60000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = /tmp/connect.offsets
    rest.advertised.host.name = null
    rest.advertised.port = null
    rest.host.name = null
    rest.port = 8083
    task.shutdown.graceful.timeout.ms = 5000
    value.converter = class io.confluent.connect.avro.AvroConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)
[2017-05-31 19:05:53,708] INFO Logging initialized @1019ms (org.eclipse.jetty.util.log:186)
[2017-05-31 19:05:54,131] INFO AvroConverterConfig values:
    schema.registry.url = [http://localhost:8081]
    max.schemas.per.subject = 1000
 (io.confluent.connect.avro.AvroConverterConfig:169)
[2017-05-31 19:05:54,323] INFO AvroDataConfig values:
    schemas.cache.config = 1000
    enhanced.avro.schema.support = false
    connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:169)
[2017-05-31 19:05:54,327] INFO AvroConverterConfig values:
    schema.registry.url = [http://localhost:8081]
    max.schemas.per.subject = 1000
 (io.confluent.connect.avro.AvroConverterConfig:169)
[2017-05-31 19:05:54,327] INFO AvroDataConfig values:
    schemas.cache.config = 1000
    enhanced.avro.schema.support = false
    connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:169)
[2017-05-31 19:05:54,349] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)
[2017-05-31 19:05:54,349] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:71)
[2017-05-31 19:05:54,349] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)
[2017-05-31 19:05:54,350] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-05-31 19:05:54,353] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2017-05-31 19:05:54,354] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
[2017-05-31 19:05:54,354] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-05-31 19:05:54,606] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
May 31, 2017 7:05:56 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2017-05-31 19:05:56,195] INFO Started o.e.j.s.ServletContextHandler@31198ceb{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-05-31 19:05:56,236] INFO Started ServerConnector@4b457c2b{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-05-31 19:05:56,238] INFO Started @3552ms (org.eclipse.jetty.server.Server:379)
[2017-05-31 19:05:56,239] INFO REST server listening at http://192.168.1.141:8083/, advertising URL http://192.168.1.141:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-05-31 19:05:56,239] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)
[2017-05-31 19:05:56,516] INFO ConnectorConfig values:
    connector.class = io.confluent.connect.s3.S3SinkConnector
    key.converter = null
    name = s3-sink
    tasks.max = 1
    transforms = null
    value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-05-31 19:05:56,516] INFO Creating connector s3-sink of type io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
[2017-05-31 19:05:56,520] INFO Instantiated connector s3-sink with version 3.2.1 of type class io.confluent.connect.s3.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
[2017-05-31 19:05:56,522] INFO S3SinkConnectorConfig values:
    filename.offset.zero.pad.width = 10
    flush.size = 3
    format.class = class io.confluent.connect.s3.format.avro.AvroFormat
    retry.backoff.ms = 5000
    rotate.interval.ms = -1
    rotate.schedule.interval.ms = -1
    s3.bucket.name = emr-datain-datalake-884875361572-eu-west-1
    s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    s3.part.size = 5242880
    s3.region = eu-west-1
    s3.ssea.name =
    s3.wan.mode = false
    schema.cache.size = 1000
    shutdown.timeout.ms = 3000
 (io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-05-31 19:05:56,523] INFO StorageCommonConfig values:
    directory.delim = /
    file.delim = +
    storage.class = class io.confluent.connect.s3.storage.S3Storage
    store.url = null
    topics.dir = topics
 (io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-05-31 19:05:56,524] INFO HiveConfig values:
    hive.conf.dir =
    hive.database = default
    hive.home =
    hive.integration = false
    hive.metastore.uris =
    schema.compatibility = NONE
 (io.confluent.connect.storage.hive.HiveConfig:180)
[2017-05-31 19:05:56,525] INFO PartitionerConfig values:
    locale =
    partition.duration.ms = 30000
    partition.field.name =
    partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
    path.format = 'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/
    schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
    timezone =
 (io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-05-31 19:05:56,525] INFO Starting S3 connector s3-sink (io.confluent.connect.s3.S3SinkConnector:61)
[2017-05-31 19:05:56,528] INFO Finished creating connector s3-sink (org.apache.kafka.connect.runtime.Worker:194)
[2017-05-31 19:05:56,528] INFO SourceConnectorConfig values:
    connector.class = io.confluent.connect.s3.S3SinkConnector
    key.converter = null
    name = s3-sink
    tasks.max = 1
    transforms = null
    value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:180)
[2017-05-31 19:05:56,532] INFO Creating task s3-sink-0 (org.apache.kafka.connect.runtime.Worker:305)
[2017-05-31 19:05:56,533] INFO ConnectorConfig values:
    connector.class = io.confluent.connect.s3.S3SinkConnector
    key.converter = null
    name = s3-sink
    tasks.max = 1
    transforms = null
    value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:180)
[2017-05-31 19:05:56,535] INFO TaskConfig values:
    task.class = class io.confluent.connect.s3.S3SinkTask
 (org.apache.kafka.connect.runtime.TaskConfig:180)
[2017-05-31 19:05:56,536] INFO Instantiated task s3-sink-0 with version 3.2.1 of type io.confluent.connect.s3.S3SinkTask (org.apache.kafka.connect.runtime.Worker:317)
[2017-05-31 19:05:56,565] INFO ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id =
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = connect-s3-sink
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2017-05-31 19:05:56,867] INFO Kafka version : 0.10.2.1-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-05-31 19:05:56,867] INFO Kafka commitId : 078e7dc02a100018 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-05-31 19:05:56,875] INFO Created connector s3-sink (org.apache.kafka.connect.cli.ConnectStandalone:90)
[2017-05-31 19:05:56,877] INFO S3SinkConnectorConfig values:
    filename.offset.zero.pad.width = 10
    flush.size = 3
    format.class = class io.confluent.connect.s3.format.avro.AvroFormat
    retry.backoff.ms = 5000
    rotate.interval.ms = -1
    rotate.schedule.interval.ms = -1
    s3.bucket.name = emr-datain-datalake-884875361572-eu-west-1
    s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
    s3.part.size = 5242880
    s3.region = eu-west-1
    s3.ssea.name =
    s3.wan.mode = false
    schema.cache.size = 1000
    shutdown.timeout.ms = 3000
 (io.confluent.connect.s3.S3SinkConnectorConfig:180)
[2017-05-31 19:05:56,880] INFO StorageCommonConfig values:
    directory.delim = /
    file.delim = +
    storage.class = class io.confluent.connect.s3.storage.S3Storage
    store.url = null
    topics.dir = topics
 (io.confluent.connect.storage.common.StorageCommonConfig:180)
[2017-05-31 19:05:56,881] INFO HiveConfig values:
    hive.conf.dir =
    hive.database = default
    hive.home =
    hive.integration = false
    hive.metastore.uris =
    schema.compatibility = NONE
 (io.confluent.connect.storage.hive.HiveConfig:180)
[2017-05-31 19:05:56,882] INFO PartitionerConfig values:
    locale =
    partition.duration.ms = 30000
    partition.field.name =
    partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
    path.format = 'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/
    schema.generator.class = class io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
    timezone =
 (io.confluent.connect.storage.partitioner.PartitionerConfig:180)
[2017-05-31 19:06:00,015] ERROR Task s3-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain
    at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:108)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:231)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:131)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1115)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:764)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:728)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4185)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4132)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1302)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1259)
    at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:110)
    at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:95)
    ... 9 more
[2017-05-31 19:06:00,018] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-31 19:06:09,509] INFO Reflections took 15011 ms to scan 560 urls, producing 12993 keys and 85595 values  (org.reflections.Reflections:229)

Please help me to ressolve this

Thank you


On Wednesday, May 31, 2017 at 10:26:21 PM UTC+5:30, Randall Hauch wrote:
On Wed, May 31, 2017 at 7:12 AM, prashanth appu <prashant...@gmail.com> wrote:
HI,

How can i transfer log data of kafka to S3 usign kafka connect

Thanks and regards

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Randall Hauch

unread,
Jun 1, 2017, 1:46:06 PM6/1/17
to confluent...@googlegroups.com
The error states that the connector was not able to load any credentials. Did you make sure the connector user has write access to the S3 bucket specified in 's3.bucket.name' and your environment has defined the required AWS credentials? See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

prashanth appu

unread,
Jun 6, 2017, 1:14:45 AM6/6/17
to Confluent Platform

Ya thank you Randall, after specifying AWS credentials iam able to send data to S3
Reply all
Reply to author
Forward
0 new messages