Hi!
First of all, thanks for building these tools and sharing them, they're awesome. I am having a lot of fun "playing" with them :). Small problem I'm having though, hoping somebody can offer some guidance:
[ I am using scalding 0.9.0rc15 ]
I have LZO compressed, Thrift serialized events that I am storing in S3. The code that writes them to S3 is something like
val lzoCodec = new LzopCodec()
// ... input and output streams ...
val lzoOutputStream = lzoCodec.createIndexedOutputStream(outputStream, new DataOutputStream(indexOutputStream))
foreach records as record { // loop for ~ 10,000 records
lzoOutputStream.write(record)
}
// Stuff this bytestream and the corresponding index into S3 with a certain object name
On the consumer-side, I have a scalding application that is trying to read these records for a scalding FixedPathLzoThrift source, declared as such:
case class LzoThriftSource(p: String) extends FixedPathLzoThrift[SnowplowRawEvent](p: String)
I am trying to run a Scalding job on Elastic Map Reduce, but I am getting the following stack trace when trying to read records from the above LzoThriftSource:
305_0012_m_000000_0: java.lang.RuntimeException: error rate while reading input records crossed threshold
at com.twitter.elephantbird.mapreduce.input.LzoRecordReader$InputErrorTracker.incErrors(LzoRecordReader.java:155)
at com.twitter.elephantbird.mapreduce.input.LzoBinaryB64LineRecordReader.nextKeyValue(LzoBinaryB64LineRecordReader.java:13
5)
at com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper$RecordReaderWrapper.<init>(DeprecatedInputFormatWrap
per.java:234)
at com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper.getRecordReader(DeprecatedInputFormatWrapper.java:92
)
at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:253)
at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:248)
at cascading.util.Util.retry(Util.java:762)
at cascading.tap.hadoop.io.MultiInputFormat.getRecordReader(MultiInputFormat.java:247)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:197)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:418)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.Exception: Unknown error
at com.twitter.elephantbird.mapreduce.input.LzoRecordReader$InputErrorTracker.incErrors(LzoRecordReader.java:138)
... 15 more
From the looks of it, it's trying to use a LzoBinaryB64LineRecordReader to read the data, when I think it should be using a LzoThriftBlockRecordReader ? Should I be writing B64 lines instead?
Any advice would be very appreciated.
Thanks,
Phil