Is each JSON object in the Kafka message expected to have the same timestamp? Or at least a timestamp within the same (configured) bucket interval? If so, I suppose you could make a CamusWrapper that knows how to extract the timestamp out of the first object in the array. Camus uses the timestamp field on each CamusWrapper instance to know what timestamp a particular Kafka message is associated with. You could make CamusWrapper<String[]> instances, but each string in that instance would have the same timestamp.
If you did that, you’d have to also make a ArrayStringRecordWriterProvider, or something like that, so that the code writing the records to HDFS would know how to join your array on newline.
Oh! Or, if you want to keep your life simpler, you could just join the array into a string in your CamusWrapper’s decode() method. Maybe add a new property to JsonStringMessageDecoder to indicate that you want JSON arrays joined together with newlines.
// Possible property name?
camus.message.decoder.json.array.join=true
// pseudo code, dunno how this would actually work
String recordDelimiter = context.getConfiguration().get(ETL_OUTPUT_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
bool joinArrays = context.getConfiguration().get(‘camus.message.decoder.json.array.join', false);
if (joinArrays && jsonObject.isArray()) {
payloadString = jsonObject.join(recordDelimiter);
}
else {
paylaodString = new String(payload);
}
Dunno, just a thought. I’m not sure if this would be better to do in the Decoder or the RecordWriter.
-Ao