Getting "No fields found using key and value schemas for table" on JDBC SInk Connector with MemSQL

3,037 views
Skip to first unread message

Nishant Verma

unread,
May 3, 2017, 11:32:15 PM5/3/17
to Confluent Platform
We are using Kafka Connect JDBC Connector. We have MemSQL as the JDBC sink. Below is the JSON which we have produced on a topic in kafka. It is the sample JSON as mentioned on confluent website for JDBC sink connector

Architecture:

2 broker node kafka cluster.
Kafka Connect in one of these servers.
Master of MemSQL on a third server.
Leaf nodes of MemSQL on other servers.

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Below is the connect-standalone.properties file:

bootstrap.servers=10.2.3.4:9092,10.5.6.7:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/media/data/confluent-3.2.0/connect.offsets


Below is the sink-quickstart-sqlite.properties file:

name=test-memsql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test4
connection.url=jdbc:mysql://1.2.3.4:3306/ABCD
auto.create=true

I have created table test4 with ID, FNAME and LNAME columns. I have placed following jars in share/java/kafka-connect-jdbc:- mssql-jdbc-6.1.0.jre8.jar,
mysql-connector-java-5.1.30.jar and jdbc-mssql.jar. 

I am using below command to start kafka Connect JDBC Connector:

./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties

I cant see any record inserted in the table while doing "select * from ABCD.test4" .

In the logs, I see error as :

[2017-05-03 08:37:03,952] ERROR Task memsql-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: test4
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:64)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-03 08:37:03,954] ERROR Task is being killed and will not recover until manually restarted 

Kafka Version - 0.10.2.0
Kafka Connect Version - 3.2.0
MemSQL Version - 5.7.1

I am attaching the nohup.out after I executed kafka connect.

For testing purposes, I am using the sample JSON as mentioned on confluent website:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
nohup.out

Nishant Verma

unread,
May 5, 2017, 7:08:03 AM5/5/17
to Confluent Platform
Hi,

Can anyone provide inputs on this issue please?

Thanks
Nishant

Nishant Verma

unread,
May 8, 2017, 10:00:20 AM5/8/17
to Confluent Platform
I am using below in connect-standalone.properties:

key.converter=com.qubole.streamx.ByteArrayConverter

value.converter=com.qubole.streamx.ByteArrayConverter


Getting an error as:

[2017-05-08 09:56:38,399] ERROR Task test-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)

org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct

        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:71)

        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)

        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)

        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

[2017-05-08 09:56:38,401] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)



Input JSON is:- {"id": 998, "product": "food", "quantity": 1000, "price": 51}

Can anyone update what could be wrong here now?

On Thursday, May 4, 2017 at 9:02:15 AM UTC+5:30, Nishant Verma wrote:

Nishant Verma

unread,
May 8, 2017, 10:01:52 AM5/8/17
to Confluent Platform
The value of valueSchema is coming as:-   Schema{BYTES}


On Thursday, May 4, 2017 at 9:02:15 AM UTC+5:30, Nishant Verma wrote:

Ewen Cheslack-Postava

unread,
May 13, 2017, 5:51:15 PM5/13/17
to Confluent Platform
Nishant,

If you use these settings:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

then no schemas will be included with the data. The JDBC sink connector currently requires the use of data with schemas, as described here: http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html#data-mapping I think it may be a bug that it's not giving an error message earlier than the "No fields found using key and value schemas" one since there is code in the schema conversion step to validate the supported types.

The connector won't support converting the data as raw bytes, and I suspect this isn't what you want anyway since it would just generate a table with a single BLOB column if you took the raw bytes and restructured them into the required Struct type.

The way to get this working is to adjust the settings above to be true instead of false, though note that this will use a wrapper format for values that includes the schema information in every single message produced.

-Ewen


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5a2048d1-bfbc-4888-9d97-9ce900110ffc%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

suresh maragani

unread,
Oct 18, 2018, 11:42:34 PM10/18/18
to Confluent Platform
Hi GUYS,

I am also facing same issue, could you please help, i am not using schema registry.


 ERROR WorkerSinkTask{id=test-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:544)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: recon.j_testdata
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:71)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2018-10-19 03:35:52,457] ERROR WorkerSinkTask{id=test-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: recon.j_testdata
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:71)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
        ... 10 more
[2018-10-19 03:35:52,458] ERROR WorkerSinkTask{id=test-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-10-19 03:35:52,458] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:100)

Oracle table:
CREATE TABLE "RECON"."J_TESTDATA" 
   ( "ID" VARCHAR2(32 BYTE) NOT NULL ENABLE, 
"PO_DOCUMENT" CLOB);

Robin Moffatt

unread,
Oct 31, 2018, 10:04:42 PM10/31/18
to confluent...@googlegroups.com
You need to have a schema to your data in order for the JDBC Sink to work. 
If you're not using Schema Registry then you need to use the embedded schema format in your JSON. 


-- 

Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/f35b7e51-b5c7-486a-87c8-90f83d18c24b%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages