Kafka Connect (HDFS connector)

935 views
Skip to first unread message

Grigoriy Roghkov

unread,
Aug 19, 2016, 5:47:42 AM8/19/16
to Confluent Platform
Hi,

I want to use HDFS connector with org.apache.kafka.connect.json.JsonConverter converter, but I've faced an issue with both Parquet and Avro formatter.
Here are worker configs:

bootstrap.servers=localhost:9092
rest.port=4002
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000


but because of following props HDFS sink code starts infinite here https://github.com/confluentinc/kafka-connect-hdfs/blob/2.x/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java#L251
(check in "if" block returns true and valueSchema is always null) and no data is being written. (I've realized that while debugging)
key.converter.schemas.enable=false
value.converter.schemas.enable=false


when I set them to true and do the following:
key.converter.schemas.enable=true
value.converter.schemas.enable=true


and produce records using Console Connector with default settings in the following format:
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"halo"}
{"schema":{"type":"string","optional":false},"payload":"salut"}

then I catch an exception:
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
    at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:64)
    at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)
    at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:487)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)
...
Caused by: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence
    at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:213)
    at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:208)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:76)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
    at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)



In case of format

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
still no success:

[2016-08-19 18:43:15,400] ERROR Task is being killed and will not recover until manually restarted: (org.apache.kafka.connect.runtime.WorkerSinkTask:303)
java.lang.IllegalArgumentException: Avro schema must be a record.
    at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:89)
    at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:103)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
    at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider.getRecordWriter(ParquetRecordWriterProvider.java:51)
    at io.confluent.connect.hdfs.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:415)
    at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:486)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:280)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:176)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
    at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Even if I try to produce records like this:
{"schema":{"type":"record","optional":false, "fields":[{"name": "name", "type": "string"}]},"payload":"hello"}

What am I doing wrong?

Thanks,
Grigoriy

Ajay Chitre

unread,
Aug 23, 2016, 5:45:29 PM8/23/16
to confluent...@googlegroups.com
I am also running into similar issue with the HDFS connector.

My messages look like this:

{"schema":{"type":"string","optional":false},"payload":"line 1"}


Getting this:

[2016-08-23 17:31:56,454] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:401)

org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence

at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)


Properties are as follows:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=spool-test
hdfs.url=hdfs://<my ip>:8020
flush.size=3

Any suggestions?


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/eef61047-7c60-49fc-9473-3a513c629ab8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Brandon Bradley

unread,
Aug 29, 2016, 12:23:13 PM8/29/16
to Confluent Platform
Hello!

Please see this message in this forum.

https://groups.google.com/d/msg/confluent-platform/_5Iign8D30w/jOtmYQTjHQAJ

Good luck!
Brandon
Reply all
Reply to author
Forward
0 new messages