Connect Distributed - Wont start after faulty connector config submitted

2,760 views
Skip to first unread message

Steven Deutscher-Kobayashi

unread,
May 20, 2016, 1:17:20 PM5/20/16
to Confluent Platform
Hello,

I am using confuent-2.0.0 platform (hdfs sink connector) locally to copy from Kafka Avro topics to my local single node hdfs.

The problem I am observing is if I submit a Connector with a faulty config, say lacking flush.size property while running distributed mode, my connect instances not only crashes but it can not startup at all.

Here are the steps I am taking

1. Start ZK, Kafka, Schema Registry, Kafka-Rest

2. Create worker offset / config topics

./bin/kafka-topics --create --zookeeper localhost:2181 --topic test-connect-configs --partitions 1 --replication-factor 1
Created topic "test-connect-configs".

./bin/kafka-topics --create --zookeeper localhost:2181 --topic test-connect-offsets --partitions 25 --replication-factor 1
Created topic "test-connect-offsets"

3. Start worker

connect-avro-distributed.properties

bootstrap.servers=localhost:9092

group.id=test-connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

config.storage.topic=test-connect-configs

offset.storage.topic=test-connect-offsets

4. Submit Connector via REST

bad.json

 
   "name":"bad_connector",
   "config": 
      "tasks.max":1,
      "topics":"t1",
      "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
      "hdfs.url":"hdfs://localhost:9000",
      "topics.dir":"/user/steven/kctopics",
      "logs.dir":"/user/steven/kclogs",
      "schema.compatibility":"BACKWARD"
   }
}

curl -X POST -H "Content-Type: application/json" --data @bad.json localhost:8083/connectors
{"name":"bad_connector","config":{"tasks.max":"1","topics":"t1","connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","hdfs.url":"hdfs://localhost:9000","topics.dir":"/user/steven/kctopics","logs.dir":"/user/steven/kclogs","schema.compatibility":"BACKWARD","name":"bad_connector"},"tasks":[]}


Now on worker,

[2016-05-20 12:34:11,025] INFO Creating connector bad_connector of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:170)
[2016-05-20 12:34:11,028] INFO Instantiated connector bad_connector with version 2.0.0 of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:183)
[2016-05-20 12:34:11,035] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
org.apache.kafka.connect.errors.ConnectException: Connector threw an exception while starting
at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:188)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:668)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Couldn't start HdfsSinkConnector due to configuration error
at io.confluent.connect.hdfs.HdfsSinkConnector.start(HdfsSinkConnector.java:48)
at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:186)
... 6 more
Caused by: io.confluent.common.config.ConfigException: Missing required configuration "flush.size" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at io.confluent.connect.hdfs.HdfsSinkConnectorConfig.<init>(HdfsSinkConnectorConfig.java:245)
at io.confluent.connect.hdfs.HdfsSinkConnector.start(HdfsSinkConnector.java:46)
... 7 more
[2016-05-20 12:34:11,053] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2016-05-20 12:34:11,091] INFO 0:0:0:0:0:0:0:1 - - [20/May/2016:16:34:09 +0000] "POST /connectors HTTP/1.1" 201 300  1426 (org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-05-20 12:34:11,098] INFO Stopped ServerConnector@6f0628de{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2016-05-20 12:34:11,108] INFO Stopped o.e.j.s.ServletContextHandler@3704122f{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2016-05-20 12:34:11,110] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder:310)
[2016-05-20 12:34:11,110] INFO Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder:331)
[2016-05-20 12:34:11,110] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:115)
[2016-05-20 12:34:11,110] INFO Stopping KafkaOffsetBackingStore (org.apache.kafka.connect.storage.KafkaOffsetBackingStore:91)
[2016-05-20 12:34:11,111] INFO Stopping KafkaBasedLog for topic test-connect-offsets (org.apache.kafka.connect.util.KafkaBasedLog:149)
[2016-05-20 12:34:11,111] INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:613)
[2016-05-20 12:34:11,114] INFO Stopped KafkaBasedLog for topic test-connect-offsets (org.apache.kafka.connect.util.KafkaBasedLog:175)
[2016-05-20 12:34:11,114] INFO Stopped KafkaOffsetBackingStore (org.apache.kafka.connect.storage.KafkaOffsetBackingStore:93)
[2016-05-20 12:34:11,114] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:155)
[2016-05-20 12:34:11,114] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:74)

Worker stops, if I try to start the worker again,

[2016-05-20 12:40:29,807] INFO Starting connector bad_connector (org.apache.kafka.connect.runtime.distributed.DistributedHerder:663)
[2016-05-20 12:40:29,810] INFO ConnectorConfig values:
connector.class = class io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max = 1
topics = [t1]
name = bad_connector
 (org.apache.kafka.connect.runtime.ConnectorConfig:165)
[2016-05-20 12:40:29,811] INFO Creating connector bad_connector of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:170)
[2016-05-20 12:40:29,812] INFO Instantiated connector bad_connector with version 2.0.0 of type io.confluent.connect.hdfs.HdfsSinkConnector (org.apache.kafka.connect.runtime.Worker:183)
[2016-05-20 12:40:29,816] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
org.apache.kafka.connect.errors.ConnectException: Connector threw an exception while starting
at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:188)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:668)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:640)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:598)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:184)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: Couldn't start HdfsSinkConnector due to configuration error
at io.confluent.connect.hdfs.HdfsSinkConnector.start(HdfsSinkConnector.java:48)
at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:186)
... 6 more
Caused by: io.confluent.common.config.ConfigException: Missing required configuration "flush.size" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at io.confluent.connect.hdfs.HdfsSinkConnectorConfig.<init>(HdfsSinkConnectorConfig.java:245)
at io.confluent.connect.hdfs.HdfsSinkConnector.start(HdfsSinkConnector.java:46)
... 7 more
[2016-05-20 12:40:29,818] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2016-05-20 12:40:29,822] INFO Stopped ServerConnector@673218ab{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2016-05-20 12:40:29,827] INFO Stopped o.e.j.s.ServletContextHandler@3704122f{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2016-05-20 12:40:29,828] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder:310)
[2016-05-20 12:40:29,828] INFO Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder:331)
[2016-05-20 12:40:29,828] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:115)
[2016-05-20 12:40:29,828] INFO Stopping KafkaOffsetBackingStore (org.apache.kafka.connect.storage.KafkaOffsetBackingStore:91)
[2016-05-20 12:40:29,828] INFO Stopping KafkaBasedLog for topic test-connect-offsets (org.apache.kafka.connect.util.KafkaBasedLog:149)
[2016-05-20 12:40:29,829] INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:613)
[2016-05-20 12:40:29,831] INFO Stopped KafkaBasedLog for topic test-connect-offsets (org.apache.kafka.connect.util.KafkaBasedLog:175)
[2016-05-20 12:40:29,831] INFO Stopped KafkaOffsetBackingStore (org.apache.kafka.connect.storage.KafkaOffsetBackingStore:93)
[2016-05-20 12:40:29,831] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:155)
[2016-05-20 12:40:29,831] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:74)


Hoping I am doing something wrong here,

Since Connect is failing on start up I cannot remove the bad connector through the REST API.

Thanks,
Steven


Ewen Cheslack-Postava

unread,
May 24, 2016, 8:30:16 PM5/24/16
to Confluent Platform
We improved error handling a lot with the 0.10 release. I think this is a case we now recover from properly -- there's a new status reporting functionality, so if the connector fails to start it'll just be marked as FAILED and the worker should be able to continue working normally (and handle a request to remove or reconfigure the connector).

-Ewen

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/76fd550a-3d5d-419d-b887-4eb433fe3b46%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

jsma...@gmail.com

unread,
Aug 9, 2016, 5:43:13 PM8/9/16
to Confluent Platform
Hello,

I'm having the same issue on the 0.10 release with Confluent Platform 3.0.0.  A bad config was submitted that defined a connector class that did not exist. Now Kafka Connect will not start and I cannot connect to the REST API to remove the bad config.

Aug 09 21:35:48 connect1 connect-distributed[3212]: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.myorg.kafka.connect.s3.MySinkConnector available connectors are: org.apache.kafka.connect.tools.VerifiableSourceConnector, com.myorg.kafka.connect.s3.S3SourceConnector, org.apache.kafka.connect.file.FileStreamSinkConnector, org.apache.kafka.connect.sink.SinkConnector, io.confluent.connect.hdfs.HdfsSinkConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, io.confluent.connect.jdbc.JdbcSourceConnector, org.apache.kafka.connect.source.SourceConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, io.confluent.connect.hdfs.tools.SchemaSourceConnector
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.Worker.getConnectorClass(Worker.java:226)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:166)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:789)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startWork(DistributedHerder.java:755)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.handleRebalanceCompleted(DistributedHerder.java:715)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:206)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:176)
Aug 09 21:35:48 connect1 connect-distributed[3212]: at java.lang.Thread.run(Thread.java:745)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,644] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,644] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,666] INFO Stopped ServerConnector@55b5f5d2{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,673] INFO Stopped o.e.j.s.ServletContextHandler@14c01636{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,678] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,678] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder:362)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,679] INFO Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder:382)
Aug 09 21:35:48 connect1 connect-distributed[3212]: [2016-08-09 21:35:48,679] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:73)
Aug 09 21:35:49 connect1 systemd[1]: kafka-connect.service: main process exited, code=exited, status=1/
FAILURE
Aug 09 21:35:49 connect1 systemd[1]: Unit kafka-connect.service entered failed state.
Aug 09 21:35:49 connect1 systemd[1]: kafka-connect.service failed.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Shikhar Bhushan

unread,
Aug 15, 2016, 3:10:20 PM8/15/16
to Confluent Platform
Hi,

I have created https://issues.apache.org/jira/browse/KAFKA-4042 for this bug. I plan to work on fixing this. Thank you for the report!

Best,

Shikhar

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages