I have one source which is dumping JSON data into already setup KAFKA. It creates multiple topics with multiple partitions and dumps data into respective topics.
My task is to flush this data from KAFKA topics into HDFS cluster (present in different server).
I downloaded confluent-3.1.1 in the KAFKA server and did below changes:
a- Updated quickstart-hdfs.properties with topic names and hdfs.url. All the topics which are generated at the KAFKA end are written here comma separated.
b- Updated connect-avro-standalone.properties as below:
====================================================================================================
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
enable.auto.commit=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=_offsets
offset.storage.file.filename=/opt/confluent-3.1.1/connect.offsets
=======================================================================================================
c- Created a blank file /opt/confluent-3.1.1/connect.offsets and given write permissions to it.
Now, my source is dumping JSON data into KAFKA topics. I started schema registry(./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties ) and then started KAFKA connect (./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties
etc/kafka-connect-hdfs/quickstart-hdfs.properties).
The console where I start KAFKA connect gives below o/p:
=================================================================================================================
[2017-01-17 14:00:16,166] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-369, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,166] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-124, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-264, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-130, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,167] INFO Fetch offset 0 is out of range for partition Prd_IN_GeneralEvents-10, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,169] INFO Fetch offset 0 is out of range for partition Prd_IN_Processed_Alert-8, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:708)
[2017-01-17 14:00:16,171] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-17 14:00:16,368] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-17 14:00:16,391] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-17 14:00:16,522] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-17 14:00:16,535] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:368)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'aa8c0c3a49e14b0c8bcaf1e2f082830b': was expecting ('true', 'false' or 'null')
at [Source: [B@3b7fb2e; line: 1, column: 65]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'aa8c0c3a49e14b0c8bcaf1e2f082830b': was expecting ('true', 'false' or 'null')
at [Source: [B@3b7fb2e; line: 1, column: 65]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3323)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2482)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:801)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:697)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:23)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
[2017-01-17 14:00:16,537] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2017-01-17 14:00:17,760] INFO Reflections took 8075 ms to scan 264 urls, producing 12033 keys and 80037 values (org.reflections.Reflections:229)
=====================================================================================================================
Hence nothing happens on HDFS.
Where am I missing the configuration or point?
If I try the example present in confluent website (create AVRO-CONSOLE-PRODUCER and write {"f1": "value1"} data in the console), data gets written into HDFS properly without any issues.
Have also tried with below changes in connect-avro-standalone.properties file, but with various errors as:
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
======================================================================================================================
Error1:
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 45)): Expected space separating root-level values
at [Source: [B@3c0b4f6; line: 1, column: 10]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
at [Source: [B@3c0b4f6; line: 1, column: 10]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core
=======================================================================================================================
Error2:
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
at [Source: [B@3c0b4f6; line: 1, column: 10]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
at [Source: [B@3c0b4f6; line: 1, column: 10]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core
===========================================================================================================================
Error3:
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('7fa1b2e9-70ce-4ded-9842-f3b9115e0932' (code 45)): Expected "true","false",null separating root-level values
at [Source: [B@3c0b4f6; line: 1, column: 10]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('-' (code 45)): Expected space separating root-level values
at [Source: [B@3c0b4f6; line: 1, column: 10]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518)
at com.fasterxml.jackson.core
==================================================================================================================================
Where am I missing the link here?
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ae331a9b-3847-4a78-9870-2496c2ac051e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAO8%3Dcz3tHq%2B8ChJj2f8bB0dUPqyaEymgt920%2BQMurgXi0DLgTA%40mail.gmail.com.
bootstrap.servers=localhost:9092
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
================================================================================================================
- My quickstart-hdfs.properties file is like below:
================================================================================================================
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=com.qubole.streamx.SourceFormat
tasks.max=1
topics=Prd_IN_Processed_Alert,Prd_IN_TripAnalysis,Prd_IN_Notification
hdfs.url=hdfs://ip-10-16-37-124:9000
flush.size=3
==============================================================================================================
I started schema registry and then started connect.
My kafka has one JSON in Prd_IN_TripAnalysis topic only as of now.
No error came this time in the connect console. It was something like below:
==========================================================================================================
***********topic****************Prd_IN_TripAnalysis
##########value#########{"id":"18a446fc-df77-4dbb-a909-c665c533866a","pdid":"f74a625e979a4112952ee8dacaadd0e5","summary":{"tripStartTimestamp":1477326589264,"tripEndTimestamp":0,"status":30,"totalGPSDistanceMetres":0.0,"avgGPSSpeed":0.0,"maxGPSSpeed":0.0,"avgInstMileage":0.0,"totalHaltTimeSeconds":0,"totalIdlingTimeSeconds":0,"totalRunningTimeMins":0,"startLocation":{"latitude":12.9748625,"longitude":77.6203403,"speed":0.0,"ts":1477326589132,"direction":-1},"endLocation":null,"driverBehaviorSummary":[{"driver":null,"noOfRapidAcceleration":0,"noOfRapidDeceleration":0,"noOfOverSpeed":0,"noOfHarshBreak":0}]},"latLongs":[{"latitude":12.9748623,"longitude":77.6203425,"speed":0.0,"ts":1477326591307,"direction":-1}],"halts":[],"idlings":[{"location":{"latitude":12.9748623,"longitude":77.6203425,"speed":0.0,"ts":1477326591307,"direction":-1},"startTimestamp":1477326591307,"endTimestamp":0,"finalLocation":{"latitude":12.9748623,"longitude":77.6203425,"speed":0.0,"ts":1477326591307,"direction":-1},"duration":0}]}
[2017-01-19 13:27:46,838] INFO Reflections took 8282 ms to scan 265 urls, producing 12225 keys and 81227 values (org.reflections.Reflections:229)
[2017-01-19 13:28:39,287] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:29:39,291] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:30:39,289] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:31:39,288] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:32:39,287] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:33:39,289] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:34:39,287] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:35:39,289] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-01-19 13:36:39,288] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
==========================================================================================================
In HDFS, I could see in +tmp:
======================================================================================================================
[platformteam@ip-10-16-37-124 ~]$ hadoop fs -ls /topics/+tmp/Prd_IN_TripAnalysis
drwxr-xr-x - root supergroup 0 2017-01-19 07:57 /topics/+tmp/Prd_IN_TripAnalysis/partition=323
[platformteam@ip-10-16-37-124 ~]$ hadoop fs -ls /topics/+tmp/Prd_IN_TripAnalysis/partition=323
Found 1 items
-rw-r--r-- 3 root supergroup 995 2017-01-19 08:08 /topics/+tmp/Prd_IN_TripAnalysis/partition=323/83e04862-3c2a-43ec-912d-49d424e344d4_tmp
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5e14bc9e-bd46-42fc-8521-dc18917bd327%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/18967975-0dce-4edb-b233-af582bb7f85a%40googlegroups.com.