Kafka Connect: Error when StringConverter is used to send strings to HDFS

995 views
Skip to first unread message

KafkaUser

unread,
Dec 9, 2016, 6:08:19 PM12/9/16
to Confluent Platform

I'm trying to use StringConverter to send strings to HDFS. My worker.properties file looks like this:


bootstrap.servers=localhost:9092


#A unique string that identifies the Connect cluster group this worker belongs to.

group.id=test-string-connect


key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.storage.StringConverter



internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false



My connector config looks like this:


hdfs_string_connector = '{"name": "hdfs-string-connector", "config": {"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-file-hdfs", "hdfs.url": "hdfs://localhost/jobs/dev/scratch", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000"}}'


I get the following error when I start the connector:


[2016-12-09 14:37:10,897] ERROR Task hdfs-string-connector-9 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:390)

org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.AvroRuntimeException: Unknown datum type io.confluent.kafka.serializers.NonRecordContainer: io.confluent.kafka.serializers.NonRecordContainer@23bd7b76

at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)

at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:64)

at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)

at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:487)

at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)

at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)

at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)

at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)

at java.util.concurrent.FutureTask.run(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type io.confluent.kafka.serializers.NonRecordContainer: io.confluent.kafka.serializers.NonRecordContainer@23bd7b76

at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:636)

at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:601)

at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)

at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)

at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)

at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)

... 17 more

[2016-12-09 14:37:10,898] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:391)


Ewen Cheslack-Postava

unread,
Dec 10, 2016, 7:28:02 PM12/10/16
to Confluent Platform
Currently the connector requires structured data (i.e. a record). This constraint could be loosened in the future, but many of the output formats and Hive integration require this so the output formats supported would be limited.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7cb71184-26d0-4b2f-acca-5aa32ae96581%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages