JSONStringMessageDecoder

335 views
Skip to first unread message

Andrew Otto

unread,
Oct 21, 2013, 3:20:19 PM10/21/13
to camu...@googlegroups.com
Hi all!

It was mentioned in another thread that this might be useful to others as well.


Thanks!
-Ao


Zhu Wayne

unread,
Mar 19, 2014, 4:59:44 PM3/19/14
to camu...@googlegroups.com
Andrew,
Could you shed some lights on how to use JsonStringMessageDecoder? I can't get anything in HDFS even though Camus Job runs fine without exception. Records read is always zero. I don't think JsonStringMessageDecoder actually gets hit.

2014-03-19 15:29:17,313 INFO com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: 
topic:price_raw partition:0 beginOffset:0 estimatedLastOffset:4
2014-03-19 15:29:17,317 INFO com.linkedin.camus.etl.kafka.common.KafkaReader: bufferSize=128
2014-03-19 15:29:17,317 INFO com.linkedin.camus.etl.kafka.common.KafkaReader: timeout=6000
2014-03-19 15:29:17,526 INFO com.linkedin.camus.etl.kafka.common.KafkaReader: Connected to leader tcp://KafkaBroker:9092 beginning reading at offset 0 latest offset=4
2014-03-19 15:29:18,637 INFO com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: 
Decoder: com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder  created for topic:price_raw
2014-03-19 15:29:19,639 INFO com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Records read : 0

# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
# Our timestamps look like 2013-09-20T15:40:17, (my timestamp is long already)
camus.message.timestamp.format=long
# use the dt field
camus.message.timestamp.field=_tmstmp
# RawRecordWriterProvider does no reformatting of the records as they come in.
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.output.record.delimiter=\n

best,
Wayne 

Zhu Wayne

unread,
Mar 19, 2014, 5:04:19 PM3/19/14
to camu...@googlegroups.com
Andrew,
I am on camus-kafka-0.8 branch and kafka_2.8.0-0.8.0. I was able to put  AVRO messages into HDFS. I used  kafka console producer to send JSON one at a time without compression.


AVRO doesn't need to set etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider. Is this correct?

Andrew Otto

unread,
Mar 19, 2014, 5:10:58 PM3/19/14
to Zhu Wayne, camu...@googlegroups.com
Hi Wayne,

Hm, it is hard for me to tell what’s going on in your sample output.  What are you trying to accomplish with  camus.message.timestamp.format=long?

Are you trying to use DateFormat.LONG?  Can you give an example of one of your timestamps?

-Andrew



--
You received this message because you are subscribed to the Google Groups "Camus - Kafka ETL for Hadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to camus_etl+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Zhu Wayne

unread,
Mar 20, 2014, 2:18:44 PM3/20/14
to camu...@googlegroups.com, Zhu Wayne
Andrew,
Thanks for the reply. I fixed the issue by increasing kafka.fetch.buffer.size to what defined in Kafka server, I noticed that on master branch it is OK to not set kafka.fetch.buffer.size. However, 0.8 branch drops the records if the buffer is too small.

I have another question on JSON.  I have a JSON array as a string in Kafka, such as [{msg1}, {msg2}, ....], but I want to store one JSON per line in HDFS. Could you shed some lights on this?  Maybe I can change the current method signature from:
public CamusWrapper<String> decode(byte[] payload)
to
public CamusWrapper<String[]> decode(byte[] payload).

Best,

Wayne

Andrew Otto

unread,
Mar 20, 2014, 3:52:32 PM3/20/14
to Zhu Wayne, camu...@googlegroups.com
Just so I understand correctly, you have:

  kafka_offset 1: [{obj1}, {obj2}]
  kafka_offset 2: [{obj3}, {msg4}]

And you want in HDFS:
  {obj1}
  {obj2}
  {obj3}
  {obj4}
?

Is each JSON object in the Kafka message expected to have the same timestamp?  Or at least a timestamp within the same (configured) bucket interval?  If so, I suppose you could make a CamusWrapper that knows how to extract the timestamp out of the first object in the array.  Camus uses the timestamp field on each CamusWrapper instance to know what timestamp a particular Kafka message is associated with.  You could make CamusWrapper<String[]> instances, but each string in that instance would have the same timestamp.

If you did that, you’d have to also make a ArrayStringRecordWriterProvider, or something like that, so that the code writing the records to HDFS would know how to join your array on newline.

Oh!  Or, if you want to keep your life simpler, you could just join the array into a string in your CamusWrapper’s decode() method.  Maybe add a new property to JsonStringMessageDecoder to indicate that you want JSON arrays joined together with newlines.
  
  // Possible property name?
  camus.message.decoder.json.array.join=true

  // pseudo code, dunno how this would actually work
  String recordDelimiter = context.getConfiguration().get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
  bool joinArrays = context.getConfiguration().get(‘camus.message.decoder.json.array.join', false);
  if (joinArrays && jsonObject.isArray()) {
    payloadString = jsonObject.join(recordDelimiter);
  }
  else {
    paylaodString =  new String(payload);
  }
  

Dunno, just a thought.  I’m not sure if this would be better to do in the Decoder or the RecordWriter.


-Ao

zhuw.chicago

unread,
Mar 20, 2014, 10:21:59 PM3/20/14
to Andrew Otto, camu...@googlegroups.com
Thanks so much Andrew. I wrote separate Json array decoder and string array writer. I ended up with one Json per line in HDFS.

Could you elaborate more on the usage of TS by Camus? I just saw one file per Kafka partiton generated on HDFS and the message order is also preserved. I am not sure how Camus uses TS. Thanks.

Andrew Otto

unread,
Mar 21, 2014, 10:21:42 AM3/21/14
to zhuw.chicago, camu...@googlegroups.com
One of the whole reasons someone would want to use Camus vs. the default kafka hadoop consumer that ships with Kafka is its timestamp bucketing feature.  Camus can be configured to read timestamps out of your data and save that data into a directory hierarchy (hourly) bucketed by that timestamp.  So you can be sure that your records are bucketed by content timestamp rather than consumption timestamp.

For example, Wikimedia is using Kafka to ship webrequest access logs to HDFS.  Say we want to count the number of requests per hour.  Since camus creates hourly import directories based on the request timestamp, we can easily just count the number of records in a directory, rather than having to check ALL records ever and filter on timestamp field.

Zhu Wayne

unread,
Mar 21, 2014, 12:25:58 PM3/21/14
to Andrew Otto, camu...@googlegroups.com
Andrew,
Thanks Andrew for your clean explanation. We are moving to near real time from once a day reporting. That definitely will save us lots of headaches to partition the data.

Best.

Wayne Zhu
--
Wayne Zhu

Zhu Wayne

unread,
Mar 21, 2014, 3:26:42 PM3/21/14
to camu...@googlegroups.com, Zhu Wayne
Andrew,
The Kafka messages contains multiple JSON documents per message to optimize performance on Kafka.  My second thought is to parse out TS from each JSON document coming in as an JSON array within a single Kafka message, so each JSON document could be written in a separate directory/file in HDFS. For example, a Kafka message may have a JSON TS set to hour 8 and another set to hour 9. 

I looked at  EtlRecordReader code and TS is set at per Kafka message level. EtlMultiOutputFormat has a method called write(EtlKey key, Object val). The key has a TS which in turns determines workingFileName. It seems that there are lots of code that need to be changed. Please let me know if you have any advice for me on this? Otherwise, I will take your advice to pick one TS for all JSON documents within a single Kafka message. Maybe this is good enough.

Andrew Otto

unread,
Mar 21, 2014, 3:54:41 PM3/21/14
to Zhu Wayne, camu...@googlegroups.com
Yeah, it would take a lot of changes to be able to parse multiple timestamps from a single Kafka message.

For example, a Kafka message may have a JSON TS set to hour 8 and another set to hour 9. 
Did you mean to say ‘a JSON document’ instead of 'a Kafka message' here?  If not, you *might* be able to use a timestamp as the Kafka message key when the message is produced.  I guess that depends on your partitioner.  If you are using a Kafka partitioner that does not depend on Kafka message key (e.g. random, round robin, something like that), then you could possible encode the timestamp of any given Kafka message in the Kafka message key, and somehow extract that and use it in Camus.

But, if you really need to check the timestamp in each JSON document, then that will be a lot of code changes, indeed.  Maybe you could add another abstraction to Camus?  CamusListWrapper or something? Dunno.

Félix GV

unread,
Mar 21, 2014, 4:02:30 PM3/21/14
to Andrew Otto, Zhu Wayne, camu...@googlegroups.com
I don't see why you would need to send several distinct JSON messages as part of the same Kafka message. Kafka already has batching capabilities built-in, so it would be easier to rely on Kafka's batching mechanism in order to enhance throughput while still sending just one JSON message per Kafka message on the producer side.

Or course, if you already have that setup running in production with legacy data you want to persist, that might be trickier to change. But if you still can change the producer side of things, that would probably be the cleanest approach.

--
Félix

Zhu Wayne

unread,
Mar 21, 2014, 4:45:25 PM3/21/14
to Félix GV, Andrew Otto, camu...@googlegroups.com
Thanks Félix and Andrew. I will push for one JSON per Kafka message. 
--
Wayne Zhu

Reply all
Reply to author
Forward
0 new messages