Regarding Kafka elastic search connector(sending json data)

1,438 views
Skip to first unread message

Mounica

unread,
May 21, 2018, 7:24:03 AM5/21/18
to Confluent Platform
Hai,
I tried sending avro data from kafka to elastic and i got succeded.But when trying to send json data from kafka topic to elastic i have been getting following errors..Please solve my scenario.


ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-05-21 16:41:49,005] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-05-21 16:41:49,006] INFO Stopping ElasticsearchSinkTask. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:178)



Please reply..
Thanks and Regards
Mouni

Robin Moffatt

unread,
May 21, 2018, 10:42:47 AM5/21/18
to confluent...@googlegroups.com
If you don't have a schema embedded in your JSON data, then set
"value.converter.schemas.enable": "false",
otherwise the JSONConverter will expect to find a schema present in this format - which the error message you've got is telling you isn't the case.


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/4cecc3e4-d60f-4fec-aa32-65feb87e1621%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mounica

unread,
May 22, 2018, 12:51:29 AM5/22/18
to Confluent Platform

Hai this is properties file..Everything set right as said by you..Still I am getting the same error..Please Help

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092



#below 4 lines defined by me..del if not working------------------------
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false



# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://10.0.14.225:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.14.225:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java



#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter

#i set new config
schemas.enable=flase
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Mohana Durga Raja

unread,
May 22, 2018, 1:09:57 AM5/22/18
to confluent...@googlegroups.com
[2018-05-22 10:36:55,426] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:544)
org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
    at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:221)
    at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-05-22 10:36:55,429] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)

    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
    at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:221)
    at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    ... 10 more
[2018-05-22 10:36:55,431] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-05-22 10:36:55,431] INFO Stopping ElasticsearchSinkTask. (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:178)



After  changing the configuartion file to I got above errors...Pls let me know how to resolve



#I added new configuration file from here


bootstrap.servers=localhost:9092

group.id=testGroup

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
config.storage.topic=connectconfigs
plugin.path=share/java


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Robin Moffatt

unread,
May 22, 2018, 5:42:05 AM5/22/18
to confluent...@googlegroups.com
Try setting

schema.ignore=true

By default, it's false—which means the connector will want a schema (which we established previously your data doesn't explicitly have)


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Mohana Durga Raja

unread,
May 22, 2018, 6:39:26 AM5/22/18
to confluent...@googlegroups.com
Hai Mr.Robin,
Thank you for your cool support. Now the code is running good.
Thanks and Regards
Mouni

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CA%2BJsER1%2BL0KfK-Pp-nd_hcnFE4w0b0j3u_eOQ01ysg58CGT3xg%40mail.gmail.com.
Reply all
Reply to author
Forward
0 new messages