Not able to run the connector in distributed mode but works well in standalone mode.

2,094 views
Skip to first unread message

Renukaradhya H D

unread,
Jan 13, 2017, 8:12:12 AM1/13/17
to Confluent Platform
Hi,

I am using Confluent 3.0.1 from past few months and was able to run all functionalities with standalone mode. I have started working on the production version and was switching to distributed mode but could not  succeed.

My configuration are below:

One zookeeper node, one Kafka node, and one schema registry node is running. I am trying to run one connector(single worker) in a distributed mode.

connect-avro-distributed.properties:

bootstrap.servers=devmetric.com:9091
group.id=operative2
consumer.max.poll.records=500

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://devkafka01.com:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://devkafka01.com:8081


internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
internal.key.converter.schema.registry.url=http://devkafka01.com:8081
internal.value.converter.schema.registry.url=http://devkafka01.com:8081

schema.registry.url=http://devkafka01.com:8081

key.converter.schemas.enable=true
value.converter.schemas.enable=true

config.storage.topic=connect-configs

offset.storage.topic=connect-offsets

status.storage.topic=connect-statuses



connect-avro-standalone.properties

bootstrap.servers=devmetric.com:9091
group.id=operative2
consumer.max.poll.records=500


key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://devkafka01.com:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://devkafka01.com:8081

internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
internal.key.converter.schema.registry.url=http://devkafka01.com:8081
internal.value.converter.schema.registry.url=http://devkafka01.com:8081

schema.registry.url=http://devkafka01.com:8081
# Local storage file for offset data
offset.storage.file.filename=/var/connectoffsets

key.converter.schemas.enable=true
value.converter.schemas.enable=true


connector.properties

name=deliverydata-elasticsearch-sink1
connector.class=com.abc.kafka.connect.sink.DeliverySinkConnector
tasks.max=1                          // I also tried with 2 tasks.
topics=dev.ps_primary_delivery


Command to run the connector in distributed mode: 
sh ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties ./etc/kafka-connect-elasticsearch/connector.properties 

Command to run the connector in standalone mode: 
sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connector.properties

Issue:
If i work on standalone mode everything works fine and I am able to get data from Kafka in my put method of SinkTask but I am not able to get it in distributed mode.

Please help to solve this.

Regards,
Aradhya

Ewen Cheslack-Postava

unread,
Jan 15, 2017, 6:37:45 PM1/15/17
to Confluent Platform
You say you're unable to get data into the SinkTask in distributed mode, but can you be more specific about why? Is there an error message? Stacktrace? Anything in the log files that would help explain why?

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/87ade30d-bdbe-4ecf-99ea-481708b5b6f6%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Renukaradhya H D

unread,
Jan 16, 2017, 12:34:03 AM1/16/17
to Confluent Platform
Hi Ewen,
I have attached logs. Please let me know if you need more information.

Regards,
Aradhya
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
distri.log

Renukaradhya H D

unread,
Jan 16, 2017, 5:30:31 AM1/16/17
to Confluent Platform
Hi Ewwn,,
More information is below:

Steps followed are:
1) Started zookeeper, Kafka, Schema registry.
2) Kafka has some messages already being pushed by Debizium connector(MySQL connector).
3) Started worker using below command. Configuration are as described above in previous post.
sh ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties
4) Started connector using REST API. Details are below:


Request Body:

{
"name": "deliverydata-connector",
"config": {
"connector.class": "com.abc.kafka.connect.sink.DeliverySinkConnector",
"tasks.max": "2",
"topics": "dev.ps_primary_delivery",
"name": "deliverydata-connector",
"elasticsearch.cluster.name": "ad_metrics_store",
"elasticsearch.hosts": "devkafka.com:9300",
"elasticsearch.bulk.size": "100",
"tenants": "tenant1"
}
}

Response:

Got response code of 201 created.

Logs: 
[2017-01-16 05:00:22,730] ERROR Found connector configuration (connector-deliverydata-connector) in wrong format: class org.apache.kafka.connect.data.Struct (org.apache.kafka.connect.storage.KafkaConfigBackingStore:498)
[2017-01-16 05:00:23,282] INFO 10.111.7.224 - - [16/Jan/2017:10:00:22 +0000] "POST /connectors HTTP/1.1" 201 374  683 (org.apache.kafka.connect.runtime.rest.RestServer:60)

5) I wanted to query if any connector is running, so did a GET query and response was 200 without any data.

Response: 200 but no data.

Logs:
[2017-01-16 05:02:31,941] INFO 10.111.7.224 - - [16/Jan/2017:10:02:31 +0000] "GET /connectors HTTP/1.1" 200 2  8 (org.apache.kafka.connect.runtime.rest.RestServer:60)

6) But if I run in standalone mode, I am getting data.
sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/deliverydata-elasticsearch-sink.properties 

Regards,
Aradhya

Renukaradhya H D

unread,
Jan 17, 2017, 3:44:28 AM1/17/17
to Confluent Platform
Hi Ewen,

I was able to solve this. My configuration was wrong in connect-avro-distributed.properties.

I was using "io.confluent.connect.avro.AvroConverter" instead of "org.apache.kafka.connect.json.JsonConverter" for internal.key.converter and internal.value.converter.

Below is my new configuration:

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


-Aradhya

Nick DeCoursin

unread,
Feb 10, 2017, 4:52:58 AM2/10/17
to Confluent Platform
It seems there's a bug when using `internal.key.converter` or `internal.value.converter` = io.confluent.connect.avro.AvroConverter.

Here's the open ticket: 
https://issues.apache.org/jira/browse/KAFKA-3988.

Thanks,
Nick D

Lucien Fregosi

unread,
Mar 6, 2017, 11:11:12 AM3/6/17
to Confluent Platform
I got the same issue with the same stack trace but I can't find any solution to my issue.

Like Renukaradhya H D  it works well in standalone mode but nothing happen when I try in distributed mode.

Thanks by advance
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Gwen Shapira

unread,
Mar 7, 2017, 12:51:35 AM3/7/17
to confluent...@googlegroups.com
Can you share your configuration and the exact error?

Gwen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 @gwenshap
Follow us: Twitter | blog

Lucien Fregosi

unread,
Mar 7, 2017, 5:52:30 AM3/7/17
to Confluent Platform
Sure !

In one side I have a kafka twitter producer which send messages in a Avro format ( And it works well in standalone).

One other element : I'm using docker compose and docker swarm to manage some services (kafka, zookeeper, elastic search, schema registry) in a cluster.

I'm using Kafka 0.10.1.1, elastic search 5.2.1, Schema Registry 2.6 and finally confluent 3.1

For docker I'm using Docker compose V3 with swarm to deploy contener and docker engine 1.13.

I attach to the message 3 files :
- confluent/etc/schema-registry/connect-avro-distributed.properties
- confluent/etc/kafka-connect-elastic-search/quick-start.properties
- The stack trace when I launch the producer/connector where nothing happen when I kafka broker send some message for a define topic

Last thing, to run the program i use this command line :
./opt/kafka/confluent/bin/connect-distributed /opt/kafka/confluent/etc/schema-registry/connect-avro-distributed.properties /opt/kafka/confluent/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
connect_distributed.properties
kafka_elasticsearch_connector.properties
stacktrace

Lucien Fregosi

unread,
Mar 9, 2017, 11:22:06 AM3/9/17
to Confluent Platform
Up !

Gwen Shapira

unread,
Mar 10, 2017, 1:56:32 PM3/10/17
to confluent...@googlegroups.com
Hi,

If you look at your log, there is no stack trace. Connect started successfully, its just that the connector is not there.

In distributed mode, you don't create connectors by passing a file in command line. You create a connector via the REST API.

Try following the example here: 

You can see that the connectors are configured using curl:
curl -X POST -H "Content-Type: application/json" \
  --data '{"name": "quickstart-avro-file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"quickstart-jdbc-test", "file": "/tmp/quickstart/jdbc-output.txt"}}' \
  http://$CONNECT_HOST:28083/connectors

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 @gwenshap
Follow us: Twitter | blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Jay

unread,
Apr 10, 2017, 9:49:28 PM4/10/17
to Confluent Platform
Hi Gwen,
I think I'm following the documentation but still got 
"Found connector configuration in wrong format: class org.apache.kafka.connect.data.Struct"

Here are my worker configurations:
bootstrap.servers=host1:port1,host2:port2,host3:port3
group.id=group_name
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=schema_registry1,schema_registry2,schema_registry3
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=schema_registry1,schema_registry2,schema_registry3
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
config.storage.topic=config_topic
offset.storage.topic=offset_topic
status.storage.topic=status_topic

I run the following command in one of the nodes (to create the connector):
curl -X POST -H "Content-Type: application/json" --data '{"name":"test_connector", "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics","topic","connection.url":"database_url","table_name_format":"table","auto.create":"true"}}' http://localhost:8083/connectors


Did I miss something?

Thanks


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 @gwenshap
Follow us: Twitter | blog

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Gwen Shapira

unread,
Apr 11, 2017, 12:31:29 PM4/11/17
to confluent...@googlegroups.com
Yes, I think you are missing these two lines:

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
>>>>>>> send an email to confluent-platf...@googlegroups.com.
>>>>>>> To post to this group, send email to confluent...@googlegroups.com.
>>>>>>> To view this discussion on the web visit
>>>>>>> https://groups.google.com/d/msgid/confluent-platform/87ade30d-bdbe-4ecf-99ea-481708b5b6f6%40googlegroups.com.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>>
>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Confluent Platform" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to confluent-platf...@googlegroups.com.
>>>>> To post to this group, send email to confluent...@googlegroups.com.
>>>>> To view this discussion on the web visit
>>>>> https://groups.google.com/d/msgid/confluent-platform/10e2bbdb-1c37-4024-a378-8beb8f78a8df%40googlegroups.com.
>>>>>
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Gwen Shapira
>>>> Product Manager | Confluent
>>>> 650.450.2760 | @gwenshap
>>>> Follow us: Twitter | blog
>>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups
>>> "Confluent Platform" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to confluent-platf...@googlegroups.com.
>>> To post to this group, send email to confluent...@googlegroups.com.
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/confluent-platform/e4ff46fc-a526-47c8-888c-f83e1d89dc90%40googlegroups.com.
>>>
>>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> --
>> Gwen Shapira
>> Product Manager | Confluent
>> 650.450.2760 | @gwenshap
>> Follow us: Twitter | blog
>>
> --
> You received this message because you are subscribed to the Google Groups
> "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to confluent-platf...@googlegroups.com.
> To post to this group, send email to confluent...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/0e3070fe-3a62-48b3-8b14-dd3feeb95791%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages