Using Array of Records in AVRO with kafka connect

2,834 views
Skip to first unread message

adam.klein...@adp.com

unread,
Mar 24, 2016, 12:16:21 PM3/24/16
to Confluent Platform
Hi,
I have a problem with an array of records in AVRO while trying to use kafka connect FileStreamSinkConnector.

config:


bootstrap.servers=kafka:9092


key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://avro-schema-registry:8081


internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false


offset.storage.file.filename=/tmp/connect.offsets.auth



sink properties:

name=auth-file-sink

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector

tasks.max=1

file=auth.sink.txt

topics=test


AVRO schema:

{

  "doc": "The payload",

  "name": "Event",

  "type": "record",

  "fields": [

    {

      "name": "people",

      "type": {

        "type": "array",

        "items": {

          "name": "Person",

          "type": "record",

          "fields": [

            {

              "doc": "Name of person",

              "name": "name",

              "type": "string"

            }

          ]

        }

      }

    }

  ]

}


Message:

{

  people: [

    {

      name: 'adam'

    },

    {

      name: 'klein'

    }

  ]

}



I'm getting this error:

[2016-03-24 14:01:32,519] ERROR Thread WorkerSinkTask-auth-file-sink-0 exiting with uncaught exception:  (org.apache.kafka.connect.util.ShutdownableThread)

java.lang.NullPointerException

at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1043)

at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1068)

at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1068)

at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:980)

at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:750)

at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)

at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)

at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

adam.klein...@adp.com

unread,
Mar 24, 2016, 12:23:34 PM3/24/16
to Confluent Platform
BTW, the problem only happens when there is an array of records in the schema.
When there isn't an array, or when there is an array of strings, then it doesn't fail.

Also, I have another consumer that's also using the schema registry, and it reads and decodes the messages successfully,
so probably the problem is not with the schema / payload, but with the connector itself.

Thanks

Ewen Cheslack-Postava

unread,
Mar 24, 2016, 1:09:09 PM3/24/16
to Confluent Platform
This looks like an issue that was reported by a few other folks -- I think it is fixed by https://github.com/confluentinc/schema-registry/pull/280. Are you running from CP 2.0.0 or 2.0.1? If so, a build of the AvroConverter from master may address this.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/44b6e743-8a8b-4de9-894b-22487773898d%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Klein-Contractor, Adam (CORP)

unread,
Mar 27, 2016, 8:44:15 AM3/27/16
to confluent...@googlegroups.com
Thanks,
I tried using the latest version as well (2.0.1.2-2.11.7) - same problem.

How can I take the build from master?

You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/F0l9cIJ-nUg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.

Mark Davis

unread,
Dec 16, 2016, 1:07:54 PM12/16/16
to Confluent Platform
Hi folks,

does connect support arrays yet?   I see this with cp 3.0.0 - cp connect 3.1.x doesn't seem to work with our version of kafka so I can't test if this resolves it..

We are using hdp 2.5 kafka, but all the cp libs..

[mark@hdp-sandbox ~]$ ~/confluent-3.0.0/bin/kafka-avro-console-producer --broker-list hdp-sandbox: 9092 --topic mtest-6-nonulls --property value.schema='{"type":"record","name":"mtestFails","fields":[{"default":"","n ame":"id","type":"string"},{"name":"fileArray","type":{"type":"array","items":{"name":"fileRecord","type":"record","f ields":[{"default":"","name":"fileName","type":"string"},{"default":"","name":"mimeType","type":"string"}]}}}]}'
{"id": "myId", "fileArray":[{"fileName": "my_name", "mimeType": "my_mime"}]}
{"id": "myId", "fileArray":[{"fileName": "my_name", "mimeType": "my_mime"}]}

Avro Console consumer works great:

[mark@hdp-sandbox ~]$ ~/confluent-3.0.0/bin/kafka-avro-console-consumer --zookeeper localhost:51 81 --topic mtest-6-nonulls --from-beginning
{"id":"myId","fileArray":[{"fileName":"my_name","mimeType":"my_mime"}]}
{"id":"myId","fileArray":[{"fileName":"my_name","mimeType":"my_mime"}]}

However Connect not so much..

[2016-12-16 18:04:47,801] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1078)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1015)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:781)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2016-12-16 18:04:47,810] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

Attempts to use 3.1.1+ end up with kafka errs

Any ideas?

[2016-12-16 18:04:47,801] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1078)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1015)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:781)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2016-12-16 18:04:47,810] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

Ewen Cheslack-Postava

unread,
Dec 17, 2016, 10:08:22 PM12/17/16
to Confluent Platform
In 3.0.0, the line of code throwing the exception checks 3 things: type, namespace, and name. I would guess that one of these is null and the Converter isn't properly handling that case (since I see "name" as a property but not "namespace"). That would be a bug in the AvroConverter.

That said, Connect has supported Arrays since the first version -- if there's a conversion issue, it will generally be with a specific Converter since the framework doesn't really deal with individual types that much (it just passes the work off to Connectors/Tasks and Converters). So I suspect the issue you're seeing is with the Converter rather than the framework itself.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/4228fea8-91e8-4781-8a52-98aaf2d8bf1e%40googlegroups.com.

vineet sharma

unread,
Jul 5, 2017, 10:20:08 AM7/5/17
to Confluent Platform
Hi,

Did anyone find the solution for this problem? I am using the latest version of Confluent S3 Connector and it is giving error for "Array" complex type. Getting below error : 

org.apache.avro.AvroRuntimeException: Not an array: "long"

at org.apache.avro.Schema.getElementType(Schema.java:262)

at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:415)

at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:477)

at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:437)

at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)

at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)

at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:75)

at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:393)

at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:197)

at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:173)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:748)



Reply all
Reply to author
Forward
0 new messages