issues about converting json-format data in Kafka to parquet-format using Kafka Connector?

3,517 views
Skip to first unread message

zhangxin...@gmail.com

unread,
Aug 4, 2016, 11:20:51 PM8/4/16
to Confluent Platform
Hi all,
    I am a newer using Confluent Platform, but I used kafka for ETL pipeline before. 
    Our scenario is below:  We put data into Kafka in json-format, and want to use Kafka connector to read json-formatted data from topic and transform to parquet-format to upload to HDFS.
    When I followed the quick-start of HDFS CONNECTOR , I found that data should be written into Kafka using avro-format firstly, and "key.converter" is set by "io.conluent.convert.avro.AvroConvertor" in etc/connect-avro-standalone.properties. so we must modify previous codes to translate json-formatted data into avro-format first , and then using "io.conluent.convert.avro.AvroConvertor" to transform avro-formatted data to parquet-format. But we don't prefer to do that.
    I want to know how to parse json-formatted data in Kafka and transform to parquet-format using Kafka connector directly, can anyone help me solve this issue?

Thanks a lot

Dustin Cote

unread,
Aug 5, 2016, 10:41:45 AM8/5/16
to confluent...@googlegroups.com
Does this converter meet your needs for input data? key.converter=org.apache.kafka.connect.json.JsonConverter  

You can find more information on the worker configurations and what they do specifically here.  You'll want to see the descriptions of key.converter and value.converter.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/0b9fa362-e44d-4581-ad34-5b72e0221f53%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote

zhangxin...@gmail.com

unread,
Aug 5, 2016, 9:59:25 PM8/5/16
to Confluent Platform
Thanks :)
I'll try it following the User Guide 

在 2016年8月5日星期五 UTC+8下午10:41:45,Dustin Cote写道:
Does this converter meet your needs for input data? key.converter=org.apache.kafka.connect.json.JsonConverter  

You can find more information on the worker configurations and what they do specifically here.  You'll want to see the descriptions of key.converter and value.converter.
On Thu, Aug 4, 2016 at 11:20 PM, <zhangxin...@gmail.com> wrote:
Hi all,
    I am a newer using Confluent Platform, but I used kafka for ETL pipeline before. 
    Our scenario is below:  We put data into Kafka in json-format, and want to use Kafka connector to read json-formatted data from topic and transform to parquet-format to upload to HDFS.
    When I followed the quick-start of HDFS CONNECTOR , I found that data should be written into Kafka using avro-format firstly, and "key.converter" is set by "io.conluent.convert.avro.AvroConvertor" in etc/connect-avro-standalone.properties. so we must modify previous codes to translate json-formatted data into avro-format first , and then using "io.conluent.convert.avro.AvroConvertor" to transform avro-formatted data to parquet-format. But we don't prefer to do that.
    I want to know how to parse json-formatted data in Kafka and transform to parquet-format using Kafka connector directly, can anyone help me solve this issue?

Thanks a lot

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Dustin Cote

zhangxin...@gmail.com

unread,
Aug 7, 2016, 9:45:43 PM8/7/16
to Confluent Platform
Hi Dustin Cote
   I followed the User Guide by setting key.converter=org.apache.kafka.connect.json.JsonConverter and value.converter=org.apache.kafka.connect.json.JsonConverter in etc/connect-avro-standalone.properties,  and I used kafka-console-producer to produce some messages like {"foo1" : "value1", "foo2" : "value2"}, but it seemed not working, there was no data transformed to HDFS, and throwed org.apache.kafka.connect.errors.DataException:

[2016-08-08 08:27:35,381] ERROR Task parquet_sink_test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload"fields and may not contain additional fields
              at org.apache.kafka.connect.json.JsonConvertor.toConnectData(JsonConvertor.java:332)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:345)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
              ....
[2016-08-08 08:27:35,283] ERROR Task is being killed and will not recover until manually restared (org.apache.kafka.connect.runtime.WorkerTask:143)

I found another issue on git: https://github.com/confluentinc/kafka-connect-hdfs/issues/74   I doubted that json-formatted input data is not supported yet?

Thanks :)

在 2016年8月5日星期五 UTC+8下午10:41:45,Dustin Cote写道:
Does this converter meet your needs for input data? key.converter=org.apache.kafka.connect.json.JsonConverter  

You can find more information on the worker configurations and what they do specifically here.  You'll want to see the descriptions of key.converter and value.converter.
On Thu, Aug 4, 2016 at 11:20 PM, <zhangxin...@gmail.com> wrote:
Hi all,
    I am a newer using Confluent Platform, but I used kafka for ETL pipeline before. 
    Our scenario is below:  We put data into Kafka in json-format, and want to use Kafka connector to read json-formatted data from topic and transform to parquet-format to upload to HDFS.
    When I followed the quick-start of HDFS CONNECTOR , I found that data should be written into Kafka using avro-format firstly, and "key.converter" is set by "io.conluent.convert.avro.AvroConvertor" in etc/connect-avro-standalone.properties. so we must modify previous codes to translate json-formatted data into avro-format first , and then using "io.conluent.convert.avro.AvroConvertor" to transform avro-formatted data to parquet-format. But we don't prefer to do that.
    I want to know how to parse json-formatted data in Kafka and transform to parquet-format using Kafka connector directly, can anyone help me solve this issue?

Thanks a lot

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Dustin Cote

Ewen Cheslack-Postava

unread,
Aug 7, 2016, 10:09:03 PM8/7/16
to Confluent Platform
I'm not aware of a reason using JsonConverter and the Parquet format wouldn't work. However, note that you probably want to include the settings key.converter.schemas.enable=false and value.converter.schemas.enable=false. The exception you are seeing is caused because the JsonConverter supports an envelope format to include schema information inline. For plain JSON, we obviously can't determine the schema so it gets returned without a schema if schemas.enable settings are false. The null schema gets converted into a catch-all schema when converting to Avro (because Avro requires a schema).

-Ewen

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

farnaz....@rittmanmead.com

unread,
Aug 9, 2016, 8:54:41 AM8/9/16
to Confluent Platform
Hi All, 

I am new to Kafka so apologies if a silly question!   I have already successfully implemented file/jdbc/avro connectors. This time I am trying the same scenario as explained here in this post. Data in my topic is in Json format and I am using the JsonConverter to output into HDFS. key.converter.schemas.enable and value.converter.schemas.enable are both set to false so I am expecting a catch-all schema too. However every time I get an OOM error as copied below. I have tried increasing memory and playing with different configs, no luck yet! 

I am not sure what I am missing here, Appreciate your help.

....
[2016-08-09 13:46:26,208] INFO Created connector hdfs-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2016-08-09 13:46:26,211] INFO HdfsSinkConnectorConfig values: 
filename.offset.zero.pad.width = 10
topics.dir = topics
flush.size = 1
connect.hdfs.principal = 
timezone = 
hive.home = 
hive.database = default
locale = 
hadoop.home = 
logs.dir = logs
schema.cache.size = 1000
format.class = io.confluent.connect.hdfs.avro.AvroFormat
hive.integration = true
hdfs.namenode.principal = 
hive.conf.dir = 
hadoop.conf.dir = 
schema.compatibility = BACKWARD
connect.hdfs.keytab = 
hdfs.url = hdfs://bigdatalite.localdomain:8020
hdfs.authentication.kerberos = false
hive.metastore.uris = thrift://bigdatalite.localdomain:9083
partitioner.class = io.confluent.connect.hdfs.partitioner.FieldPartitioner
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
path.format = 
 (io.confluent.connect.hdfs.HdfsSinkConnectorConfig:178)
[2016-08-09 13:46:26,841] INFO Hadoop configuration directory  (io.confluent.connect.hdfs.DataWriter:94)
[2016-08-09 13:46:30,355] INFO Trying to connect to metastore with URI thrift://bigdatalite.localdomain:9083 (hive.metastore:376)
[2016-08-09 13:46:30,447] INFO Connected to metastore. (hive.metastore:472)
[2016-08-09 13:46:30,617] INFO Sink task WorkerSinkTask{id=hdfs-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2016-08-09 13:46:30,798] INFO Discovered coordinator bigdatalite.localdomain:9093 (id: 2147483647 rack: null) for group connect-hdfs-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
[2016-08-09 13:46:30,799] INFO Revoking previously assigned partitions [] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2016-08-09 13:46:30,800] INFO (Re-)joining group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2016-08-09 13:46:30,845] INFO Successfully joined group connect-hdfs-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2016-08-09 13:46:30,847] INFO Setting newly assigned partitions [JSON5_HDFS-0] for group connect-hdfs-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2016-08-09 13:46:30,890] INFO Started recovery for topic partition JSON5_HDFS-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-08-09 13:46:30,916] INFO Finished recovery for topic partition JSON5_HDFS-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
Exception in thread "CLASSPATH traversal thread." java.lang.OutOfMemoryError: Java heap space
at java.util.zip.ZipFile.getZipEntry(ZipFile.java:557)
at java.util.zip.ZipFile.access$900(ZipFile.java:60)
at java.util.zip.ZipFile$ZipEntryIterator.next(ZipFile.java:524)
at java.util.zip.ZipFile$ZipEntryIterator.nextElement(ZipFile.java:499)
at java.util.zip.ZipFile$ZipEntryIterator.nextElement(ZipFile.java:480)
at java.util.jar.JarFile$JarEntryIterator.next(JarFile.java:257)
at java.util.jar.JarFile$JarEntryIterator.nextElement(JarFile.java:266)
at java.util.jar.JarFile$JarEntryIterator.nextElement(JarFile.java:247)
at org.reflections.vfs.ZipDir$1$1.computeNext(ZipDir.java:31)
at org.reflections.vfs.ZipDir$1$1.computeNext(ZipDir.java:26)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.reflections.Reflections.scan(Reflections.java:240)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.<init>(Reflections.java:129)
at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268)
at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:377)
at java.lang.Thread.run(Thread.java:745)
[2016-08-09 13:46:44,508] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:390)
java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:102)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:133)
at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:681)
at io.confluent.connect.hdfs.TopicPartitionWriter.alterHiveSchema(TopicPartitionWriter.java:600)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:255)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-08-09 13:46:44,516] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:391)
[2016-08-09 13:46:44,517] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
....


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Dustin Cote

unread,
Aug 9, 2016, 11:05:42 AM8/9/16
to confluent...@googlegroups.com
It looks like the memory is getting blown out when the classpath is being populated.  Do you have a large file on the classpath by chance?

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote
Customer Operations Engineer | Confluent
Follow us: Twitter | blog

zhangxin...@gmail.com

unread,
Aug 9, 2016, 10:29:00 PM8/9/16
to Confluent Platform
Hi Ewen,
   Thanks for answering my issues :)
   When the key.converter.schemas.enable and value.converter.schemas.enable are both set to false, I got an log of "coordinator parquet_sink_test generation 1 is dead..." on the screen and no other error logs, I got confused... 
   if I don't use plain JSON, how do I construct the schema for my JSON-format data?
   Here's an example which I constructed before using json-schema.org:
   {
      "type" : "object",
      "properties" : {
          "foo1" : {"type" : "string"},
          "foo2" : {"type" : "integer"},
          "foo3" : {"type" : "string"}
      },
      "required" : ["foo1", "foo2", "foo3"]
   }
   But the schema seems not working, is there anything wrong with my schema working with JsonConverter?

Thanks


在 2016年8月8日星期一 UTC+8上午10:09:03,Ewen Cheslack-Postava写道:
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Brandon Bradley

unread,
Aug 29, 2016, 12:40:30 PM8/29/16
to Confluent Platform
Hello Ewan,

If JSON with no schema gets a 'catch-all schema', then kafka-connect-hdfs should work with JSON out of the box. Right? What is the proper config for that? JSONConverter and schemas=true for just external keys and values? Internal keys and values also? I haven't found any good answers, examples, or posts for this.

Cheers!
Brandon
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

karan alang

unread,
Sep 19, 2016, 2:58:30 PM9/19/16
to Confluent Platform

Hello - i'm getting the same error ..

Did you get a fix for this ?

Pls let me know.

thanks !

karan alang

unread,
Sep 19, 2016, 3:02:25 PM9/19/16
to Confluent Platform


On Monday, September 19, 2016 at 11:58:30 AM UTC-7, karan alang wrote:

Hello - i'm getting the same error ..

Did you get a fix for this ?

Pls. let me know.
   
    Pls note - i'm using the default Apache Kafka install (standalone-mode)

austin solomon

unread,
Sep 20, 2016, 8:10:52 AM9/20/16
to Confluent Platform
Hi I have created a pipeline of File source to HDFS sink, here I have given json file as source and wanted to save in HDFS.
The configuration is like this :
[2016-09-20 17:13:40,718] INFO HdfsSinkConnectorConfig values: 
connect.hdfs.keytab = 
connect.hdfs.principal = 
filename.offset.zero.pad.width = 10
flush.size = 0
format.class = io.confluent.connect.hdfs.avro.AvroFormat
hadoop.conf.dir = /home/tg1/hadoop-2.7.1/etc/hadoop
hadoop.home = /home/tg1/hadoop-2.7.1
hdfs.authentication.kerberos = false
hdfs.namenode.principal = 
hdfs.url = hdfs://tg33:9000
hive.conf.dir = 
hive.database = default
hive.home = 
hive.integration = false
hive.metastore.uris = 
locale = en_IN
logs.dir = logs
partitioner.class = io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
path.format = 'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/
schema.cache.size = 1000
schema.compatibility = NONE
storage.class = io.confluent.connect.hdfs.storage.HdfsStorage
timezone = UTC
topics.dir = topics


I got the following error:
 [2016-09-20 17:13:40,971] ERROR Task FileSink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:404)
org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:64)
at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:487)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:264)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:171)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: io.confluent.kafka.serializers.NonRecordContainer cannot be cast to java.lang.CharSequence
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:213)
at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:208)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:76)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
... 17 more
[2016-09-20 17:13:40,971] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405)
[2016-09-20 17:13:41,048] INFO WorkerSinkTask{id=FileSink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2016-09-20 17:13:41,050] ERROR Task FileSink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.

can any one help me how to convert and save the data.
Thanks 

Andrew Otto

unread,
Sep 20, 2016, 9:26:44 AM9/20/16
to confluent...@googlegroups.com
I’m also interested to see if this can work.  I gave it a go back in May or something and wasn’t successfully.  It seemed like I’d need to add a lot of code (at the time) to import JSON from Kafka into HDFS using Kafka Connect.  BUT!  Maybe things have changed! :)


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.



--
Dustin Cote

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Pratim Ghosh

unread,
Nov 17, 2016, 6:10:03 AM11/17/16
to Confluent Platform
+1

Same also in my case. Added as below

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

There is no more logs to debug.Please suggest.

Thanks,
~Pratim

Gwen Shapira

unread,
Nov 17, 2016, 1:22:52 PM11/17/16
to confluent...@googlegroups.com
Hey,

Are you looking to import JSON as JSON? Or to convert it to Avro/Parquet?
The former seems very simple. JSON->Avro conversion requires a
provided schema or something to do inference, which is more
complicated.
>>>>>> an email to confluent-platf...@googlegroups.com.
>>>>>> To post to this group, send email to confluent...@googlegroups.com.
>>>>>> To view this discussion on the web visit
>>>>>> https://groups.google.com/d/msgid/confluent-platform/0b9fa362-e44d-4581-ad34-5b72e0221f53%40googlegroups.com.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Dustin Cote
>>>>> confluent.io
>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Confluent Platform" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to confluent-platf...@googlegroups.com.
>>>> To post to this group, send email to confluent...@googlegroups.com.
>>>> To view this discussion on the web visit
>>>> https://groups.google.com/d/msgid/confluent-platform/613de208-970c-48fb-bf73-35a5b8d751a3%40googlegroups.com.
>>>>
>>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ewen
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Confluent Platform" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to confluent-platf...@googlegroups.com.
>> To post to this group, send email to confluent...@googlegroups.com.
>> To view this discussion on the web visit
> --
> You received this message because you are subscribed to the Google Groups
> "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to confluent-platf...@googlegroups.com.
> To post to this group, send email to confluent...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/CAO8%3Dcz3PcfhP-BXwputCLVEkiwZpeML7uYpXGUca9_EcD48BQw%40mail.gmail.com.
>
> For more options, visit https://groups.google.com/d/optout.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Pratim

unread,
Nov 21, 2016, 12:26:34 AM11/21/16
to Confluent Platform
Hi Gwen,

Thanks for the quick reply.

Yes I am looking to to import JSON as JSON and  hdfsconnector should work with JSON out of the box to get the same json as imported.No Avro/Parquet.
I am just with above group chat [Ewen- Brandon- zhangxin].

As you said, [former seems very simple.] could you please explain bit more related to config/JSON changes to get rod of below exception..

I don't want to see more as below exception.

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload"fields and may not contain additional fields

              at org.apache.kafka.connect.json.JsonConvertor.toConnectData(JsonConvertor.java:332)'


If I add below to properties, then no more above. But I didn't get the expected output written to hdfs.
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Any pointer to this.

Thanks,
~Pratim

>>>>>> To post to this group, send email to confluent...@googlegroups.com.
>>>>>> To view this discussion on the web visit
>>>>>> https://groups.google.com/d/msgid/confluent-platform/0b9fa362-e44d-4581-ad34-5b72e0221f53%40googlegroups.com.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Dustin Cote
>>>>> confluent.io
>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Confluent Platform" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> To post to this group, send email to confluent...@googlegroups.com.
>>>> To view this discussion on the web visit
>>>> https://groups.google.com/d/msgid/confluent-platform/613de208-970c-48fb-bf73-35a5b8d751a3%40googlegroups.com.
>>>>
>>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ewen
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Confluent Platform" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> To post to this group, send email to confluent...@googlegroups.com.
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/confluent-platform/eeb42d69-95db-4098-a085-7a02880305a8%40googlegroups.com.
>>
>> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an

Mands

unread,
Dec 15, 2016, 11:37:02 PM12/15/16
to Confluent Platform
Hi Pratim/Gwen,

I am facing the same exception and error.  I can see the same issue highlighted in multiple groups but not able to get the correct fix. Did you get the resolution, if so please post that same?

Thanks,
Mands

bigdat...@gmail.com

unread,
Jan 4, 2017, 5:13:36 PM1/4/17
to Confluent Platform
Hi,
I tried with following settings for connector.

key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

The error which occurred previosuly " ERROR Task parquet_sink_test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.
runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload"fields and may not contain additional fields
              at org.apache.kafka.connect.json.JsonConvertor.toConnectData(JsonConvertor.java:332)" is now not occuring.

But the commit is not happening, and there are no more logs to see. And also checked hdfs data is not written.

Could you please provide any information on this to make it work.

Joshua Hua

unread,
Oct 13, 2017, 3:40:57 PM10/13/17
to Confluent Platform
Hi,
I am also interested in how to handle the simple json without schema as I am encountering the similar problem when trying to import local file to kafka topic using kafka connect.

Can anyone shed some light? how to create Struct object from plain json string if this is what is missing?

The task is very simple: read the file, for each row ( plain json string), extract the "id" field as message key, send message to kafka topic.
But I keep getting the following error:
## stacktrace:

[2017-10-13 12:28:20,055] ERROR Task aggregation-local-file-source-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)


org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field extraction], found: java.lang.String


        at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:45)


        at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:60)


        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:39)


        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:189)


        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)


        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)


        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)


        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


        at java.util.concurrent.FutureTask.run(FutureTask.java:266)


        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)


        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)


        at java.lang.Thread.run(Thread.java:748)





# connect-standalone.properties


bootstrap.servers=localhost:9092


key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter


key.converter.schemas.enable=false

value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false



# connect-file-source.properties

name=aggregation-local-file-source

connector.class=FileStreamSource

tasks.max=1

file=testData/test.txt

topic=connect-test



transforms=ExtractKey,SetKey

transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Value

transforms.ExtractKey.field=id

transforms.SetKey.type=org.apache.kafka.connect.transforms.ValueToKey

transforms.SetKey.fields=id


# local file as input source - "testData/test.txt"

{id:1230,description:,value:2.003275776E9,timeStamp:1507748059000}
{id:1231,description:,value:3.746452528E9,timeStamp:1507748059000}
{id:1232,description:,value:7.047452615E9,timeStamp:1507748059000}
{id:1233,description:,value:1.002746454E9,timeStamp:1507748059000}




Dimitri

unread,
Feb 8, 2018, 10:05:08 AM2/8/18
to Confluent Platform
Did you find a solution to this problem. Is there one or must some create a producer instead?

Andrew Otto

unread,
Feb 8, 2018, 10:46:28 AM2/8/18
to confluent...@googlegroups.com
Hiya,

I’m very interested in something like this, and while there isn’t yet a good solution, there is some discussion at WMF about possibly implementing something like this.  We need to do some more evaluations first.  (we might just use Avro, we might figure out how to use Avro-JSON in Kafka, etc.)

In the meantime, I’ve written a JSON -> Hive/Parquet Spark job[1][2][3].  This searches for and scans input JSON data, infers a Spark schema, and uses that to write to a parquet backed Hive table.  It will also pick up new field additions and alter the Hive table to include new fields.  It does not support field type changes.  This only works in Hadoop with Hive and Spark.  If this is something folks find useful, I could see about moving it out of our large analytics project and into its own standalone repo.

- Andrew Otto
  Senior Systems Engineer
  Wikimedia Foundation






--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

hetals...@gmail.com

unread,
May 20, 2018, 6:45:30 PM5/20/18
to Confluent Platform
Did you find the solution for following issue? I am also facing the same issue
Reply all
Reply to author
Forward
0 new messages