Kafka Connect with non-confluent Kafka distro

942 views
Skip to first unread message

Mark Davis

unread,
Sep 3, 2016, 8:31:35 AM9/3/16
to Confluent Platform
Hi folks,

trying out kafka connect - our organization run kafka using a hadoop distribution - I think this is a pretty common use-case -  I'm trying to run an example to see if connect with confluent hdfs-sink can do a job for us.  It seems to be running, but nothing is appearing on the hdfs.

Setup (on hortonworks hdp 2.4 sandbox which matches our PRO versions):

1. The kafka version bundled with hadoop is 0.9.0 so using the confluent 2.0.1 components (schema registry and hdfs sink).
2. built the following jars from the tagged confluent repos and put in a "lib" folder (tip: lots of people will be working with existing kafkas, a facility to download older versions would be nice!):

avro-1.7.7.jar           
common-utils-2.0.1.jar           
kafka-connect-avro-converter-2.0.1.jar  
kafka-schema-registry-client-2.0.1.jar
common-config-2.0.1.jar  
kafka-avro-serializer-2.0.1.jar  
kafka-connect-hdfs-2.0.1.jar

3. Added those to the classpath:
export CLASSPATH=~/kafka-connect/lib/

4. Created the following files:

connect-avro-standalone.properties
bootstrap.servers=localhost:9092                                                                              
key
.converter=io.confluent.connect.avro.AvroConverter                                                        
key
.converter.schema.registry.url=http://localhost:8081                                                      
value
.converter=io.confluent.connect.avro.AvroConverter                                                      
value
.converter.schema.registry.url=http://localhost:8081                                                    
internal.key.converter=org.apache.kafka.connect.json.JsonConverter                                            
internal.value.converter=org.apache.kafka.connect.json.JsonConverter                                          
internal.key.converter.schemas.enable=false                                                                  
internal.value.converter.schemas.enable=false                                                                
offset
.storage.file.filename=/tmp/connect.offsets                                                            
rest
.port=8086                                                                                                

hdfs-sink.properties
name=hdfs-sink
connector
.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks
.max=1
topics
=mytopic
hdfs
.url=hdfs://localhost:8020
flush
.size=3
topics
.dir=/connect/topics
logs
.dir=/connect/logs

I had previously created the /connect/topics and /connect/logs directories in the sandbox and granted permissions to the user that connect was running as.

5. Ran kafka connect:

 /usr/hdp/current/kafka-broker/bin/connect-standalone.sh connect-avro-standalone.properties hdfs-sink.properties

When it starts up, it looks okay in the console:

[2016-09-03 12:18:39,251] INFO StandaloneConfig values:
        cluster
= connect
        rest
.advertised.host.name = null
        task
.shutdown.graceful.timeout.ms = 5000
        rest
.host.name = null
        rest
.advertised.port = null
        bootstrap
.servers = [localhost:9092]
        offset
.flush.timeout.ms = 5000
        offset
.flush.interval.ms = 60000
        rest
.port = 8086
       
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
       
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
        value
.converter = class io.confluent.connect.avro.AvroConverter
        key
.converter = class io.confluent.connect.avro.AvroConverter
 
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:165)
[2016-09-03 12:18:39,813] INFO Logging initialized @1272ms (org.eclipse.jetty.util.log:186)
[2016-09-03 12:18:39,854] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:53)
[2016-09-03 12:18:39,855] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:89)
[2016-09-03 12:18:39,882] INFO ProducerConfig values:
        compression
.type = none
        metric
.reporters = []
        metadata
.max.age.ms = 300000
        metadata
.fetch.timeout.ms = 60000
        reconnect
.backoff.ms = 50
        sasl
.kerberos.ticket.renew.window.factor = 0.8
        bootstrap
.servers = [localhost:9092]
       
retry.backoff.ms = 100
        sasl
.kerberos.kinit.cmd = /usr/bin/kinit
        buffer
.memory = 33554432
        timeout
.ms = 30000
        key
.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        sasl
.kerberos.service.name = null
        sasl
.kerberos.ticket.renew.jitter = 0.05
        ssl
.keystore.type = JKS
        ssl
.trustmanager.algorithm = PKIX
        block
.on.buffer.full = false
        ssl
.key.password = null
        max
.block.ms = 9223372036854775807
        sasl
.kerberos.min.time.before.relogin = 60000
        connections
.max.idle.ms = 540000
        ssl
.truststore.password = null
        max
.in.flight.requests.per.connection = 1
        metrics
.num.samples = 2
        client
.id =
        ssl
.endpoint.identification.algorithm = null
        ssl
.protocol = TLS
        request
.timeout.ms = 2147483647
        ssl
.provider = null
        ssl
.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        acks
= all
        batch
.size = 16384
        ssl
.keystore.location = null
        receive
.buffer.bytes = 32768
        ssl
.cipher.suites = null
        ssl
.truststore.type = JKS
        security
.protocol = PLAINTEXT
        retries
= 2147483647
        max
.request.size = 1048576
        value
.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        ssl
.truststore.location = null
        ssl
.keystore.password = null
        ssl
.keymanager.algorithm = SunX509
        metrics
.sample.window.ms = 30000
        partitioner
.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        send
.buffer.bytes = 131072
        linger
.ms = 0
 
(org.apache.kafka.clients.producer.ProducerConfig:165)
[2016-09-03 12:18:39,946] INFO Kafka version : 0.9.0.2.4.0.0-169 (org.apache.kafka.common.utils.AppInfoParser:82)
[2016-09-03 12:18:39,947] INFO Kafka commitId : 29fa247911f6823b (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-09-03 12:18:39,948] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:53)
[2016-09-03 12:18:40,075] INFO Worker started (org.apache.kafka.connect.runtime.Worker:111)
[2016-09-03 12:18:40,076] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:57)
[2016-09-03 12:18:40,076] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:58)
[2016-09-03 12:18:40,076] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:91)
[2016-09-03 12:18:40,249] INFO jetty-9.2.14.v20151106 (org.eclipse.jetty.server.Server:327)
Sep 03, 2016 12:18:41 PM org.glassfish.jersey.internal.Errors logErrors
WARNING
: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING
: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING
: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.


[2016-09-03 12:18:41,270] INFO Started o.e.j.s.ServletContextHandler@7b139eab{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2016-09-03 12:18:41,283] INFO Started ServerConnector@2c9399a4{HTTP/
1.1}{0.0.0.0:8086} (org.eclipse.jetty.server.ServerConnector:266)
[2016-09-03 12:18:41,283] INFO Started @2744ms (org.eclipse.jetty.server.Server:379)
[2016-09-03 12:18:41,289] INFO REST server listening at http://172.16.1.28:8086/, advertising URL http://172.16.1.28:8086/ (org.apache.kafka.connect.runtime.rest.RestServer:132)
[2016-09-03 12:18:41,289] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:60)
[2016-09-03 12:18:41,292] INFO ConnectorConfig values:
        connector
.class = class io.confluent.connect.hdfs.HdfsSinkConnector
        tasks
.max = 1
        topics
= [mytopic]
        name
= hdfs-sink
 
(org.apache.kafka.connect.runtime.ConnectorConfig:165)
[2016-09-03 12:18:41,292] INFO Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:170)
[2016-09-03 12:18:41,294] INFO Instantiated connector hdfs-sink with version 2.0.1 of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:183)
[2016-09-03 12:18:41,301] INFO HdfsSinkConnectorConfig values:
        filename
.offset.zero.pad.width = 10
        topics
.dir = /connect/topics
        flush
.size = 3
        timezone
=
        connect
.hdfs.principal =
        hive
.home =
        hive
.database = default
        rotate
.interval.ms = -1
       
retry.backoff.ms = 5000
        locale
=
        hadoop
.home =
        logs
.dir = /connect/logs
        schema
.cache.size = 1000
        format
.class = io.confluent.connect.hdfs.avro.AvroFormat
        hive
.integration = false
        hdfs
.namenode.principal =
        hive
.conf.dir =
        partition
.duration.ms = -1
        hadoop
.conf.dir =
        schema
.compatibility = NONE
        connect
.hdfs.keytab =
        hdfs
.url = hdfs://localhost:8020
        hdfs
.authentication.kerberos = false
        hive
.metastore.uris =
        partition
.field.name =
        kerberos
.ticket.renew.period.ms = 3600000
        shutdown
.timeout.ms = 3000
        partitioner
.class = io.confluent.connect.hdfs.partitioner.DefaultPartitioner
        storage
.class = io.confluent.connect.hdfs.storage.HdfsStorage
        path
.format =
 
(io.confluent.connect.hdfs.HdfsSinkConnectorConfig:135)
[2016-09-03 12:18:41,301] INFO Finished creating connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:193)
[2016-09-03 12:18:41,304] INFO TaskConfig values:
        task
.class = class io.confluent.connect.hdfs.HdfsSinkTask
 
(org.apache.kafka.connect.runtime.TaskConfig:165)
[2016-09-03 12:18:41,305] INFO Creating task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:256)
[2016-09-03 12:18:41,305] INFO Instantiated task hdfs-sink-0 with version 2.0.1 of type io.confluent.connect.hdfs.HdfsSinkTask (org.apache.kafka.connect.runtime.Worker:267)
[2016-09-03 12:18:41,321] INFO ConsumerConfig values:
        metric
.reporters = []
        metadata
.max.age.ms = 300000
        value
.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
       
group.id = connect-hdfs-sink
        partition
.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect
.backoff.ms = 50
        sasl
.kerberos.ticket.renew.window.factor = 0.8
        max
.partition.fetch.bytes = 1048576
        bootstrap
.servers = [localhost:9092]
       
retry.backoff.ms = 100
        sasl
.kerberos.kinit.cmd = /usr/bin/kinit
        sasl
.kerberos.service.name = null
        sasl
.kerberos.ticket.renew.jitter = 0.05
        ssl
.keystore.type = JKS
        ssl
.trustmanager.algorithm = PKIX
        enable
.auto.commit = false
        ssl
.key.password = null
        fetch
.max.wait.ms = 500
        sasl
.kerberos.min.time.before.relogin = 60000
        connections
.max.idle.ms = 540000
        ssl
.truststore.password = null
        session
.timeout.ms = 30000
        metrics
.num.samples = 2
        client
.id =
        ssl
.endpoint.identification.algorithm = null
        key
.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl
.protocol = TLS
        check
.crcs = true
        request
.timeout.ms = 40000
        ssl
.provider = null
        ssl
.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl
.keystore.location = null
        heartbeat
.interval.ms = 3000
       
auto.commit.interval.ms = 5000
        receive
.buffer.bytes = 32768
        ssl
.cipher.suites = null
        ssl
.truststore.type = JKS
        security
.protocol = PLAINTEXT
        ssl
.truststore.location = null
        ssl
.keystore.password = null
        ssl
.keymanager.algorithm = SunX509
        metrics
.sample.window.ms = 30000
        fetch
.min.bytes = 1024
        send
.buffer.bytes = 131072
       
auto.offset.reset = earliest
 
(org.apache.kafka.clients.consumer.ConsumerConfig:165)
[2016-09-03 12:18:41,356] INFO Kafka version : 0.9.0.2.4.0.0-169 (org.apache.kafka.common.utils.AppInfoParser:82)
[2016-09-03 12:18:41,356] INFO Kafka commitId : 29fa247911f6823b (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-09-03 12:18:41,359] INFO Created connector hdfs-sink (org.apache.kafka.connect.cli.ConnectStandalone:82)

So this is all I see in the console - the hdfs-sink connector shows in the connect Rest API, but nothing else happening.  When I send messages in avro serialized with confluent schema registry instance, nothing is happening on hdfs.  I don't see any other log messages in hdfs or kafka or anywhere.. any idea what's going on here?


Mark Davis

unread,
Sep 5, 2016, 12:28:59 PM9/5/16
to Confluent Platform
Some more more info - again, apologies I'm probably doing something really dumb here, but tbh the documentation is still a tad sketchy, so hoping this will lead to info that can help some others when it starts working for me...

Accessing the connector config via rest api:

GET hdfs://hdp-sandbox:8086/connectors/hdfs-sink/config

{
 
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
 
"topics.dir": "/connect/topics",
 
"flush.size": "3",
 
"tasks.max": "1",
 
"topics": "mytopic",
 
"name": "hdfs-sink",
 
"hdfs.url": "hdfs://hdp-sandbox",
 
"logs.dir": "/connect/logs"
}


And the tasks are as:

GET hdfs://hdp-sandbox:8086/connectors/hdfs-sink/tasks

[
 
{
   
"id": {
     
"connector": "hdfs-sink",
     
"task": 0
   
},
   
"config": {
     
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
     
"topics.dir": "/connect/topics",
     
"task.class": "io.confluent.connect.hdfs.HdfsSinkTask",
     
"flush.size": "3",
     
"tasks.max": "1",
     
"topics": "mytopic",
     
"name": "hdfs-sink",
     
"hdfs.url": "hdfs://hdp-sandbox",
     
"logs.dir": "/connect/logs"
   
}
 
}
]

As above the tasks seem ok - I literally don't see anything else happening in the logs - should I?  From the docs (http://docs.confluent.io/2.0.0/connect/intro.html#quickstart), I should expect to see some data flowing to HDFS - I don't, and I don't see any errors either - any clue where I should be looking for log messages?  Nothing obvious in the kafka logs either. :/

Mark Davis

unread,
Sep 5, 2016, 4:10:14 PM9/5/16
to Confluent Platform
Okay now I feel silly.. my kafka server was running on an odd port.. when I changed the broker to 6667 and an fqdn instead of local host, thinks moved on and I got some errors from the task itself that I can work on. I guess the moral is that at least in 0.9.0 version of connect, there are no useful messages when kafka settings are incorrectly specified.

Gaurav Mishra

unread,
Sep 6, 2016, 7:38:29 AM9/6/16
to Confluent Platform
Hi Mark,

I am also using same non confluent kafka cluster(Cloudera). I was able to write into HDFS in distributed mode but after continuous run I got below posted Error.

I have already posted in Group with below subject:
[Kerbose authentication error while renew lease ticket]

kindly post if you are able to run it successfully.

regards
Gaurav Mishra
Reply all
Reply to author
Forward
0 new messages