Kafka Topic not writing JSON data to HDFS

2,328 views
Skip to first unread message

Nishant Verma

unread,
Jan 17, 2017, 5:25:01 AM1/17/17
to Confluent Platform

I have one source which is dumping JSON data into already setup KAFKA. It creates multiple topics with multiple partitions and dumps data into respective topics.

My task is to flush this data from KAFKA topics into HDFS cluster (present in different server).


I downloaded confluent-3.1.1 in the KAFKA server and did below changes:


a- Updated quickstart-hdfs.properties with topic names and hdfs.url. All the topics which are generated at the KAFKA end are written here comma separated.

b- Updated connect-avro-standalone.properties as below:

====================================================================================================

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

key.converter.schema.registry.url=http://localhost:8081

value.converter.schema.registry.url=http://localhost:8081

enable.auto.commit=true

auto.commit.interval.ms=1000

offset.flush.interval.ms=1000

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.topic=_offsets

offset.storage.file.filename=/opt/confluent-3.1.1/connect.offsets

=======================================================================================================

c- Created a blank file /opt/confluent-3.1.1/connect.offsets and given write permissions to it.



Now, my source is dumping JSON data into KAFKA topics. I started schema registry(./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties ) and then started KAFKA connect (./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties 

etc/kafka-connect-hdfs/quickstart-hdfs.properties).


The console where I start KAFKA connect gives below o/p:

=================================================================================================================

[2017-01-17 14:00:16,166] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-369, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,166] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-124, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-264, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-130, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-10, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,169] INFO Fetch offset 0 is out of range for partition Prd_IN_Processed_Alert-8, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)

[2017-01-17 14:00:16,171] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-17 14:00:16,368] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-17 14:00:16,391] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-17 14:00:16,522] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-17 14:00:16,535] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:368)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'aa8c0c3a49e14b0c8bcaf1e2f082830b': was expecting ('true', 'false' or 'null')

at [Source: [B@3b7fb2e; line: 1, column: 65]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'aa8c0c3a49e14b0c8bcaf1e2f082830b': was expecting ('true', 'false' or 'null')

at [Source: [B@3b7fb2e; line: 1, column: 65]

at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)

at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)

at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)

at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)

at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)

at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)

at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)

at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)

at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:23)

at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)

at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:744)

[2017-01-17 14:00:16,537] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

[2017-01-17 14:00:17,760] INFO Reflections took 8075 ms to scan 264 urls, producing 12033 keys and 80037 values (org.reflections.Reflections:229)

=====================================================================================================================

Hence nothing happens on HDFS.


Where am I missing the configuration or point?


If I try the example present in confluent website (create AVRO-CONSOLE-PRODUCER and write {"f1": "value1"} data in the console), data gets written into HDFS properly without any issues.


Have also tried with below changes in connect-avro-standalone.properties file, but with various errors as:


key.converter=io.confluent.connect.avro.AvroConverter

value.converter=io.confluent.connect.avro.AvroConverter

======================================================================================================================

Error1:

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 45)): Expected space separating root-level values

at [Source: [B@3c0b4f6; line: 1, column: 10]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values

at [Source: [B@3c0b4f6; line: 1, column: 10]

at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)

at com.fasterxml.jackson.core

=======================================================================================================================

Error2:

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values

at [Source: [B@3c0b4f6; line: 1, column: 10]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values

at [Source: [B@3c0b4f6; line: 1, column: 10]

at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)

at com.fasterxml.jackson.core

===========================================================================================================================

Error3:

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('7fa1b2e9-70ce-4ded-9842-f3b9115e0932' (code 45)): Expected "true","false",null separating root-level values

at [Source: [B@3c0b4f6; line: 1, column: 10]

Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values

at [Source: [B@3c0b4f6; line: 1, column: 10]

at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)

at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)

at com.fasterxml.jackson.core

==================================================================================================================================


Where am I missing the link here?

Dustin Cote

unread,
Jan 17, 2017, 9:49:48 AM1/17/17
to confluent...@googlegroups.com
Hi Nishant,

It looks like there are two problems here:
1) Your JSON data cannot be converted to Connect data with the JsonConverter (this is the exception you are seeing)
2) You are trying to write JSON data with the HDFS Sink Connector

For #1, you should be writing data to the topic you want to use the Connector with using the same JsonConverter, otherwise you will see this sort of conversion problem. If you want to really write the data to HDFS in the exact same format as you have it in Kafka, you should consider using a ByteArrayConverter as shown here

For #2, you won't be able to just convert to Avro on the fly, so you need a different format. You may be interested in this SourceFormat shown here. This combined with the ByteArrayConverter will help you write data "as is" from Kafka to HDFS. This brings about its own problems (e.g. Hive integration won't be possible, you don't know anything about the data's schema, etc.) but if it is what you really want (and some people do), you can plug in the converter and format shown above. This means you need to plugin the converter and format yourself by compiling the jar for the connector they ship with, and adding it to the classpath. Then you need to be sure to reference these classes in the [key|value].converter and format.class configurations respectively. 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ae331a9b-3847-4a78-9870-2496c2ac051e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Dustin Cote
Customer Operations Engineer | Confluent
Follow us: Twitter | blog
Message has been deleted

Nishant Verma

unread,
Jan 18, 2017, 3:04:55 AM1/18/17
to Confluent Platform
Thanks Dustin.

I did below changes :

a- compiled and included ByteArrayConverter and SourceFormat in properties file. Now the connect-avro-standalone.properties is like below:
============================================================================================================
bootstrap.servers=localhost:9092
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
format.class=com.qubole.streamx.SourceFormat
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
====================================================================================================================

Now, my KAFKA topics have data. When I run KAFKA connect I get below error now:

======================================================================================================================
[2017-01-18 13:21:28,588] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:404)
org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.AvroRuntimeException: Unknown datum type io.confluent.kafka.serializers.NonRecordContainer: io.confluent.kafka.serializers.NonRecordContainer@792769e2
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296)
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:64)
        at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:59)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:535)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:294)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:234)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:103)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type io.confluent.kafka.serializers.NonRecordContainer: io.confluent.kafka.serializers.NonRecordContainer@792769e2
        at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:636)
        at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:601)
        at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
        at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290)
        ... 17 more
[2017-01-18 13:21:28,590] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405)
[2017-01-18 13:21:28,794] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-18 13:21:28,810] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:744)
[2017-01-18 13:21:28,810] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2017-01-18 13:21:31,671] INFO Reflections took 8133 ms to scan 264 urls, producing 12035 keys and 80072 values  (org.reflections.Reflections:229)
==============================================================================================================================

In the ByteArrayConverter code, I tried printing:
============================================================================================
public SchemaAndValue toConnectData(String topic, byte[] value) {
    try {
    System.out.println("***********topic****************"+topic);
    System.out.println("##########value#########"+((value==null)?"getting nulll":new String(value)));
============================================================================================
and new String(value) is printing the JSON in the console along with topic from a line before.

What is with this unknown datum type exception and how can that be resolved?


Many thanks
Nishant

Andrew Otto

unread,
Jan 18, 2017, 10:02:39 AM1/18/17
to confluent...@googlegroups.com
​Hi​ Nishant,

I just wrote a blog post about how Wikimedia imports JSON to HDFS from Kafka:

It doesn’t use Kafka Connect, but it might help you if you are willing to try something else.

Dustin Cote

unread,
Jan 18, 2017, 10:08:04 AM1/18/17
to confluent...@googlegroups.com
Hi Nishant,

It would appear you are still using the avro format.class. To be clear, there are two different configuration locations:
1) The worker configuration under connect-avro-standalone.properties
2) The connector configuration under etc/kafka-connect-hdfs/quickstart-hdfs.properties

[key|value].converter should go into the worker configuration in #1. format.class should go into the connector configuration in #2. Please give that a go and let us know how it works out.

Thanks,

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Nishant Verma

unread,
Jan 19, 2017, 2:17:22 AM1/19/17
to Confluent Platform


On Tuesday, January 17, 2017 at 3:55:01 PM UTC+5:30, Nishant Verma wrote:

Nishant Verma

unread,
Jan 19, 2017, 3:29:30 AM1/19/17
to Confluent Platform
Hi Dustin

Thanks for the update. It seems it has moved ahead a bit. Let me explain the current situation:

- My connect-avro-standalone.properties file is something like below:
======================================================================================================================

bootstrap.servers=localhost:9092

key.converter=com.qubole.streamx.ByteArrayConverter

value.converter=com.qubole.streamx.ByteArrayConverter

key.converter.schema.registry.url=http://localhost:8081

value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

================================================================================================================


- My quickstart-hdfs.properties file is like below:

================================================================================================================

name=hdfs-sink

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

format.class=com.qubole.streamx.SourceFormat

tasks.max=1

topics=Prd_IN_Processed_Alert,Prd_IN_TripAnalysis,Prd_IN_Notification

hdfs.url=hdfs://ip-10-16-37-124:9000

flush.size=3

==============================================================================================================


I started schema registry and then started connect.


My kafka has one JSON in Prd_IN_TripAnalysis topic only as of now. 


No error came this time in the connect console. It was something like below:

==========================================================================================================

***********topic****************Prd_IN_TripAnalysis

##########value#########{"id":"18a446fc-df77-4dbb-a909-c665c533866a","pdid":"f74a625e979a4112952ee8dacaadd0e5","summary":{"tripStartTimestamp":1477326589264,"tripEndTimestamp":0,"status":30,"totalGPSDistanceMetres":0.0,"avgGPSSpeed":0.0,"maxGPSSpeed":0.0,"avgInstMileage":0.0,"totalHaltTimeSeconds":0,"totalIdlingTimeSeconds":0,"totalRunningTimeMins":0,"startLocation":{"latitude":12.9748625,"longitude":77.6203403,"speed":0.0,"ts":1477326589132,"direction":-1},"endLocation":null,"driverBehaviorSummary":[{"driver":null,"noOfRapidAcceleration":0,"noOfRapidDeceleration":0,"noOfOverSpeed":0,"noOfHarshBreak":0}]},"latLongs":[{"latitude":12.9748623,"longitude":77.6203425,"speed":0.0,"ts":1477326591307,"direction":-1}],"halts":[],"idlings":[{"location":{"latitude":12.9748623,"longitude":77.6203425,"speed":0.0,"ts":1477326591307,"direction":-1},"startTimestamp":1477326591307,"endTimestamp":0,"finalLocation":{"latitude":12.9748623,"longitude":77.6203425,"speed":0.0,"ts":1477326591307,"direction":-1},"duration":0}]}

[2017-01-19 13:27:46,838] INFO Reflections took 8282 ms to scan 265 urls, producing 12225 keys and 81227 values  (org.reflections.Reflections:229)

[2017-01-19 13:28:39,287] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:29:39,291] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:30:39,289] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:31:39,288] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:32:39,287] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:33:39,289] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:34:39,287] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:35:39,289] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

[2017-01-19 13:36:39,288] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)


==========================================================================================================


In HDFS, I could see in +tmp:

======================================================================================================================

[platformteam@ip-10-16-37-124 ~]$ hadoop fs -ls /topics/+tmp/Prd_IN_TripAnalysis

drwxr-xr-x   - root supergroup          0 2017-01-19 07:57 /topics/+tmp/Prd_IN_TripAnalysis/partition=323



[platformteam@ip-10-16-37-124 ~]$ hadoop fs -ls /topics/+tmp/Prd_IN_TripAnalysis/partition=323

Found 1 items

-rw-r--r--   3 root supergroup        995 2017-01-19 08:08 /topics/+tmp/Prd_IN_TripAnalysis/partition=323/83e04862-3c2a-43ec-912d-49d424e344d4_tmp


=======================================================================================================================

If I cat this file, I do see my JSON but that does not come unless I do ctrl+C in the connect console after throwing below error:
=====================================================================================================================
[2017-01-19 13:38:25,485] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:76)
[2017-01-19 13:38:25,486] INFO Stopping task hdfs-sink-0 (org.apache.kafka.connect.runtime.Worker:341)
[2017-01-19 13:38:25,657] ERROR Error discarding temp file hdfs://ip-10-16-37-124:9000/topics//+tmp/Prd_IN_TripAnalysis/partition=323/83e04862-3c2a-43ec-912d-49d424e344d4_tmp for Prd_IN_TripAnalysis-323 partition=323 when closing TopicPartitionWriter: (io.confluent.connect.hdfs.TopicPartitionWriter:364)
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798)
        at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1916)
        at org.apache.hadoop.hdfs.DistributedFileSystem$12.doCall(DistributedFileSystem.java:638)
        at org.apache.hadoop.hdfs.DistributedFileSystem$12.doCall(DistributedFileSystem.java:634)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:634)
        at io.confluent.connect.hdfs.storage.HdfsStorage.delete(HdfsStorage.java:77)
        at io.confluent.connect.hdfs.TopicPartitionWriter.deleteTempFile(TopicPartitionWriter.java:629)
        at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:361)
        at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:296)
        at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:121)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:147)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:744)
[2017-01-19 13:38:25,660] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:38:25,761] INFO Stopping connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:218)
[2017-01-19 13:38:25,761] INFO Stopped connector hdfs-sink (org.apache.kafka.connect.runtime.Worker:229)
[2017-01-19 13:38:25,761] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:122)
[2017-01-19 13:38:25,761] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:68)
[2017-01-19 13:38:25,761] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:142)
[2017-01-19 13:38:25,761] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:86)
[2017-01-19 13:38:25,761] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:73)

====================================================================================================================

Now, I have two questions:

1- Why is the JSON, only present in /topics/+tmp/Prd_IN_TripAnalysis/partition=323 and not in /topics/Prd_IN_TripAnalysis/partition=323 ? (For test avro data produced through console, it went inside /topics/test_topic)

2- Why did that error come as "Filesystem closed" when I did ctrl+C and only then the tmp file got the JSON?

Do I need to do some WAL config or some commit config somewhere for JSON to be flushed out of +tmp?


Awaiting your inputs.

Many thanks
Nishant Verma


 

On Tuesday, January 17, 2017 at 3:55:01 PM UTC+5:30, Nishant Verma wrote:

Nishant Verma

unread,
Jan 19, 2017, 8:16:52 AM1/19/17
to Confluent Platform
Changed flush.size=1 in quickstart-hdfs.properties file and it worked a single record. Was able to flush out of +tmp and was able to write in HDFS. 


On Tuesday, January 17, 2017 at 3:55:01 PM UTC+5:30, Nishant Verma wrote:

Dustin Cote

unread,
Jan 19, 2017, 8:17:04 AM1/19/17
to confluent...@googlegroups.com
Yes, looks like you haven't triggered the condition to roll the WAL file. In your case, you have set flush.size=3 which means you have to have 3 records written in order to trigger a roll. I notice your file size for the tmp file in HDFS is just 995 bytes, so probably you've only sent a single record. You should try sending through some more data and also validate that you really do want to roll after 3 records are sent.

The "Filesystem closed" message I'd usually expect if HDFS was shut down already when shutdown the connector. This message is coming from the HDFS client that is embedded in the connector. It's trying to delete the tmp file as it's an incomplete record.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Nishant Verma

unread,
Jan 20, 2017, 1:44:46 AM1/20/17
to Confluent Platform
Yes Dustin. I did change the flush.size to various values for my data to be pushed out. It worked. 

Can I make a flush out restriction based on size of data for one topic instead of number of record?

I mean, if topic A has 124/256 MB of record, then only flush out data in HDFS,else hold it and wait for more record to come to reach 124/256 MB and then flush out to HDFS.

Thanks
Nishant Verma

 

On Tuesday, January 17, 2017 at 3:55:01 PM UTC+5:30, Nishant Verma wrote:

Nishant Verma

unread,
Jan 20, 2017, 7:15:28 AM1/20/17
to Confluent Platform
While we were pondering over the flush size issue, we figured we could set the flush.size to a large number which is equal to (Block Size * 1024 * 1024 * Number of records we expect per second)/(Size of one file in MB). This way we can control our batching.

We will also be using hourly partitioning with partitioner.class=io.confluent.connect.hdfs.partitioner.DailyPartitioner.

I have one small question here though:-

Since I am using Daily Partitioner here, can I also use one Hourly Bucketing along with it? My data will be written with 24 buckets which will be partitioned on a day to day basis. Is bucketing supported here? If not, what is the alternative for my scenario?

Many thanks
Nishant Verma

On Tuesday, January 17, 2017 at 3:55:01 PM UTC+5:30, Nishant Verma wrote:

Dustin Cote

unread,
Jan 20, 2017, 8:00:22 AM1/20/17
to confluent...@googlegroups.com
There's no bucketing done by the DailyPartitioner than I'm aware of at this time, but you can also implement your own extension to the TimeBasedPartitioner class to support your needs there. The DailyPartitioner is an extension of that class and could be used as a template I would think. Of course, I'm not sure what the level of effort would be there as I presume you would need to do some work on the Hive integration side as well to support bucketing.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Nishant Verma

unread,
Jan 20, 2017, 8:12:27 AM1/20/17
to Confluent Platform
Thanks for this update Dustin.

For production kind of setup, where TBs of records would be written onto the KAFKA topics, what is the best practice to use KAFKA connect? 

My kafka instance is running on AWS hostname a.b.c.d and my hadoop namenode is on AWS hostname p.q.r.s. For development/POC purposes, we have kept confluent in the same box as we have kafka instance running i.e. on a.b.c.d. The HDFS cluster size is 500GB.

But for production type setup where the cluster size would be 20-30 TB, is it advisable to keep confluent in the same box as KAFKA instance or in Namenode box or a separate box? How much separate disk size would confluent need in such a production case?

Nishant


On Tuesday, January 17, 2017 at 3:55:01 PM UTC+5:30, Nishant Verma wrote:

Shashank Singh

unread,
Oct 25, 2018, 10:52:56 AM10/25/18
to Confluent Platform

Hi Nishant/ Dustin,

I am running through with similar scenario where I am getting
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
But I am working with Postgresql as a sink destination.
What all the changes I should make in order to achieve the same.
Please help me guys.

Thanks & Regards,
Shashank
Reply all
Reply to author
Forward
0 new messages