So I am using camus for getting simple JSON logs from Kafka Cluster and dumping into HDFS using Camus Map Reduce.
Everything is working I am able to fetch logs from Kafka Cluster read the json format by JsonStringMessageDecoder and fetch the '@timestamp' key from json and use this value as my camus timestamp. 'message' key in json will fetch the message and store it into HDFS.
Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/home/deep.shah/tmp
2015-07-07 14:53:13,705 INFO [main] kafka.CamusJob - Dir Destination set to: /camus/topics
2015-07-07 14:53:13,777 INFO [main] kafka.CamusJob - Previous execution: hdfs://test-local-EMPTY/camus/exec/history/2015-07-07-20-10-09
2015-07-07 14:53:13,797 INFO [main] kafka.CamusJob - New execution temp location: /camus/exec/2015-07-07-21-53-13
2015-07-07 14:53:14,612 INFO [main] kafka.CamusJob - Fetching metadata from broker
172.16.1.96:6667 with client id my_client_name for 0 topic(s) []
2015-07-07 14:53:14,706 INFO [main] zlib.ZlibFactory - Successfully loaded & initialized native-zlib library
2015-07-07 14:53:14,707 INFO [main] compress.CodecPool - Got brand-new compressor [.deflate]
2015-07-07 14:53:14,773 INFO [main] kafka.CamusJob - previous offset file:hdfs://test-local-EMPTY/camus/exec/history/2015-07-07-20-10-09/offsets-previous
2015-07-07 14:53:14,823 INFO [main] compress.CodecPool - Got brand-new decompressor [.deflate]
2015-07-07 14:53:14,911 INFO [main] Configuration.deprecation - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
2015-07-07 14:53:15,094 INFO [main] mapreduce.JobSubmitter - number of splits:0
2015-07-07 14:53:15,126 INFO [main] Configuration.deprecation - mapred.map.max.attempts is deprecated. Instead, use mapreduce.map.maxattempts
2015-07-07 14:53:15,126 INFO [main] Configuration.deprecation - mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
2015-07-07 14:53:15,244 INFO [main] mapreduce.JobSubmitter - Submitting tokens for job: job_1436297498215_0004
2015-07-07 14:53:15,361 INFO [main] impl.YarnClientImpl - Submitted application application_1436297498215_0004
2015-07-07 14:53:15,385 INFO [main] mapreduce.Job - Running job: job_1436297498215_0004
2015-07-07 14:53:20,471 INFO [main] mapreduce.Job - Job job_1436297498215_0004 running in uber mode : false
2015-07-07 14:53:20,472 INFO [main] mapreduce.Job - map 0% reduce 0%
2015-07-07 14:53:21,499 INFO [main] mapreduce.Job - Job job_1436297498215_0004 completed successfully
2015-07-07 14:53:21,561 INFO [main] mapreduce.Job - Counters: 2
Job Counters
Total time spent by all maps in occupied slots (ms)=0
Total time spent by all reduces in occupied slots (ms)=0
2015-07-07 14:53:21,565 INFO [main] kafka.CamusJob - Group: Job Counters
2015-07-07 14:53:21,565 INFO [main] kafka.CamusJob - Total time spent by all maps in occupied slots (ms): 0
2015-07-07 14:53:21,565 INFO [main] kafka.CamusJob - Total time spent by all reduces in occupied slots (ms): 0
2015-07-07 14:53:21,565 INFO [main] kafka.CamusJob - Group: Job Counters
2015-07-07 14:53:21,565 INFO [main] kafka.CamusJob - Total time spent by all maps in occupied slots (ms): 0
2015-07-07 14:53:21,565 INFO [main] kafka.CamusJob - Total time spent by all reduces in occupied slots (ms): 0
2015-07-07 14:53:21,567 INFO [main] kafka.CamusJob - Moving execution to history : /camus/exec/history/2015-07-07-21-53-13
2015-07-07 14:53:21,583 INFO [main] kafka.CamusJob - Job finished
2015-07-07 14:53:21,616 INFO [main] mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2015-07-07 14:53:21,695 INFO [main] reporter.BaseReporter - ***********Timing Report*************
Job time (seconds):
pre setup 0.0 (0%)
get splits 0.0 (0%)
hadoop job 6.0 (75%)
commit 0.0 (0%)
Total: 0 minutes 8 seconds
Hadoop job task times (seconds):
min 9223372036854776.0
mean NaN
max 0.0
skew NaN/0.0 = NaN
Task wait time (seconds):
min 9223372036854776.0
mean NaN
max 0.0
Hadoop task breakdown:
kafka �
decode �
map output �
other �
Total MB read: 0