Hi folks,
trying out kafka connect - our organization run kafka using a hadoop distribution - I think this is a pretty common use-case - I'm trying to run an example to see if connect with confluent hdfs-sink can do a job for us. It seems to be running, but nothing is appearing on the hdfs.
Setup (on hortonworks hdp 2.4 sandbox which matches our PRO versions):
1. The kafka version bundled with hadoop is 0.9.0 so using the confluent 2.0.1 components (schema registry and hdfs sink).
2. built the following jars from the tagged confluent repos and put in a "lib" folder (tip: lots of people will be working with existing kafkas, a facility to download older versions would be nice!):
avro-1.7.7.jar
common-utils-2.0.1.jar
kafka-connect-avro-converter-2.0.1.jar
kafka-schema-registry-client-2.0.1.jar
common-config-2.0.1.jar
kafka-avro-serializer-2.0.1.jar
kafka-connect-hdfs-2.0.1.jar
3. Added those to the classpath:
export CLASSPATH=~/kafka-connect/lib/
4. Created the following files:
connect-avro-standalone.properties
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
rest.port=8086
hdfs-sink.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=mytopic
hdfs.url=hdfs://localhost:8020
flush.size=3
topics.dir=/connect/topics
logs.dir=/connect/logs
I had previously created the /connect/topics and /connect/logs directories in the sandbox and granted permissions to the user that connect was running as.
5. Ran kafka connect:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh connect-avro-standalone.properties hdfs-sink.properties
When it starts up, it looks okay in the console:
[2016-09-03 12:18:39,251] INFO StandaloneConfig values:
cluster = connect
rest.advertised.host.name = null
task.shutdown.graceful.timeout.ms = 5000
rest.host.name = null
rest.advertised.port = null
bootstrap.servers = [localhost:9092]
offset.flush.timeout.ms = 5000
offset.flush.interval.ms = 60000
rest.port = 8086
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
value.converter = class io.confluent.connect.avro.AvroConverter
key.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:165)
[2016-09-03 12:18:39,813] INFO Logging initialized @1272ms (org.eclipse.jetty.util.log:186)
[2016-09-03 12:18:39,854] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:53)
[2016-09-03 12:18:39,855] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:89)
[2016-09-03 12:18:39,882] INFO ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 9223372036854775807
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 1
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 2147483647
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = all
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 2147483647
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
(org.apache.kafka.clients.producer.ProducerConfig:165)
[2016-09-03 12:18:39,946] INFO Kafka version : 0.9.0.2.4.0.0-169 (org.apache.kafka.common.utils.AppInfoParser:82)
[2016-09-03 12:18:39,947] INFO Kafka commitId : 29fa247911f6823b (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-09-03 12:18:39,948] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:53)
[2016-09-03 12:18:40,075] INFO Worker started (org.apache.kafka.connect.runtime.Worker:111)
[2016-09-03 12:18:40,076] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:57)
[2016-09-03 12:18:40,076] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:58)
[2016-09-03 12:18:40,076] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:91)
[2016-09-03 12:18:40,249] INFO jetty-9.2.14.v20151106 (org.eclipse.jetty.server.Server:327)
Sep 03, 2016 12:18:41 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2016-09-03 12:18:41,270] INFO Started o.e.j.s.ServletContextHandler@7b139eab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2016-09-03 12:18:41,283] INFO Started ServerConnector@2c9399a4{HTTP/1.1}{0.0.0.0:8086} (org.eclipse.jetty.server.ServerConnector:266)
[2016-09-03 12:18:41,283] INFO Started @2744ms (org.eclipse.jetty.server.Server:379)
[2016-09-03 12:18:41,289] INFO REST server listening at http://172.16.1.28:8086/, advertising URL http://172.16.1.28:8086/ (org.apache.kafka.connect.runtime.rest.RestServer:132)
[2016-09-03 12:18:41,289] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:60)
[2016-09-03 12:18:41,292] INFO ConnectorConfig values:
connector.class = class io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max = 1
topics = [mytopic]
name = hdfs-sink
(org.apache.kafka.connect.runtime.ConnectorConfig:165)
[2016-09-03 12:18:41,292] INFO Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:170)
[2016-09-03 12:18:41,294] INFO Instantiated connector hdfs-sink with version 2.0.1 of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:183)
[2016-09-03 12:18:41,301] INFO HdfsSinkConnectorConfig values:
filename.offset.zero.pad.width = 10
topics.dir = /connect/topics
flush.size = 3
timezone =
connect.hdfs.principal =
hive.home =
hive.database = default
rotate.interval.ms = -1
retry.backoff.ms = 5000
locale =
hadoop.home =
logs.dir = /connect/logs
schema.cache.size = 1000
format.class = io.confluent.connect.hdfs.avro.AvroFormat
hive.integration = false
hdfs.namenode.principal =
hive.conf.dir =
partition.duration.ms = -1
hadoop.conf.dir =
schema.compatibility = NONE
connect.hdfs.keytab =
hdfs.url = hdfs://localhost:8020
hdfs.authentication.kerberos = false
hive.metastore.uris =
partition.field.name =
kerberos.ticket.renew.period.ms = 3600000
shutdown.timeout.ms = 3000
partitioner.class = io.confluent.connect.hdfs.partitioner.DefaultPartitioner
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
path.format =
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135)
[2016-09-03 12:18:41,301] INFO Finished creating connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:193)
[2016-09-03 12:18:41,304] INFO TaskConfig values:
task.class = class io.confluent.connect.hdfs.HdfsSinkTask
(org.apache.kafka.connect.runtime.TaskConfig:165)
[2016-09-03 12:18:41,305] INFO Creating task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:256)
[2016-09-03 12:18:41,305] INFO Instantiated task hdfs-sink-0 with version 2.0.1 of type io.confluent.connect.hdfs.HdfsSinkTask (org.apache.kafka.connect.runtime.Worker:267)
[2016-09-03 12:18:41,321] INFO ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = connect-hdfs-sink
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = earliest
(org.apache.kafka.clients.consumer.ConsumerConfig:165)
[2016-09-03 12:18:41,356] INFO Kafka version : 0.9.0.2.4.0.0-169 (org.apache.kafka.common.utils.AppInfoParser:82)
[2016-09-03 12:18:41,356] INFO Kafka commitId : 29fa247911f6823b (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-09-03 12:18:41,359] INFO Created connector hdfs-sink (org.apache.kafka.connect.cli.ConnectStandalone:82)
So this is all I see in the console - the hdfs-sink connector shows in the connect Rest API, but nothing else happening. When I send messages in avro serialized with confluent schema registry instance, nothing is happening on hdfs. I don't see any other log messages in hdfs or kafka or anywhere.. any idea what's going on here?