Scalding + LZO Thrift

249 views
Skip to first unread message

Phil Kallos

unread,
Mar 18, 2014, 12:10:46 AM3/18/14
to cascadi...@googlegroups.com
Hi!

First of all, thanks for building these tools and sharing them, they're awesome. I am having a lot of fun "playing" with them :). Small problem I'm having though, hoping somebody can offer some guidance:

[ I am using scalding 0.9.0rc15  ]

I have LZO compressed, Thrift serialized events that I am storing in S3. The code that writes them to S3 is something like

val lzoCodec = new LzopCodec()
// ... input and output streams ...
val lzoOutputStream = lzoCodec.createIndexedOutputStream(outputStream, new DataOutputStream(indexOutputStream))

foreach records as record { // loop for ~ 10,000 records
   lzoOutputStream.write(record)
}
// Stuff this bytestream and the corresponding index into S3 with a certain object name

I have pulled these objects from S3 and decompressed them and they certainly appear to contain the "right" data. This reference code that writes this data is here: https://github.com/pkallos/snowplow/blob/4af787078efe9d29a082af24384edc913ba21e8e/4-storage/kinesis-s3-sink/src/main/scala/com.snowplowanalytics.snowplow.sinks.kinesiss3/S3Emitter.scala#L63

On the consumer-side, I have a scalding application that is trying to read these records for a scalding FixedPathLzoThrift source, declared as such:
case class LzoThriftSource(p: String) extends FixedPathLzoThrift[SnowplowRawEvent](p: String)

There is a corresponding unit test here that appears to work fine locally (not sure if meaningful because it doesn't actually use LZO) https://github.com/pkallos/snowplow/blob/83e44c8fbfba4ddbfbdb2380a38477f827eadfd5/3-enrich/scala-hadoop-enrich/src/test/scala/com.snowplowanalytics.snowplow.enrich.hadoop/good/LzoThriftTest.scala#L177

I am trying to run a Scalding job on Elastic Map Reduce, but I am getting the following stack trace when trying to read records from the above LzoThriftSource:
305_0012_m_000000_0: java.lang.RuntimeException: error rate while reading input records crossed threshold
        at com.twitter.elephantbird.mapreduce.input.LzoRecordReader$InputErrorTracker.incErrors(LzoRecordReader.java:155)
        at com.twitter.elephantbird.mapreduce.input.LzoBinaryB64LineRecordReader.nextKeyValue(LzoBinaryB64LineRecordReader.java:13
5)
        at com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper$RecordReaderWrapper.<init>(DeprecatedInputFormatWrap
per.java:234)
        at com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper.getRecordReader(DeprecatedInputFormatWrapper.java:92
)
        at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:253)
        at cascading.tap.hadoop.io.MultiInputFormat$1.operate(MultiInputFormat.java:248)
        at cascading.util.Util.retry(Util.java:762)
        at cascading.tap.hadoop.io.MultiInputFormat.getRecordReader(MultiInputFormat.java:247)
        at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:197)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:418)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.Exception: Unknown error
        at com.twitter.elephantbird.mapreduce.input.LzoRecordReader$InputErrorTracker.incErrors(LzoRecordReader.java:138)
        ... 15 more

From the looks of it, it's trying to use a LzoBinaryB64LineRecordReader to read the data, when I think it should be using a LzoThriftBlockRecordReader ?  Should I be writing B64 lines instead?

Any advice would be very appreciated.

Thanks,
Phil




Phil Kallos

unread,
Mar 18, 2014, 10:24:11 PM3/18/14
to cascadi...@googlegroups.com
Following up on this, I found the answer!!  Details below for those interested.

After reading more of the elephant-bird code, it turns out there is something special in the way the Thrift objects written, they are written in Protobuf "blocks". So I have to change my code to use 

val lzoCodec = new LzopCodec()
// ... input and output streams ...
val lzoOutputStream = lzoCodec.createIndexedOutputStream(outputStream, new DataOutputStream(indexOutputStream))
val thriftBlockWriter = new ThriftBlockWriter[SnowplowRawEvent](lzoOutputStream, classOf[SnowplowRawEvent], config.BUFFER_BYTE_SIZE_LIMIT.toInt)

foreach records as record { // loop for ~ 10,000 records
   thriftBlockWriter.write(record)
}
// This close is important because it flushes the rest of the blocks if ever there any any stragglers
thriftBlockWriter.close

Subsequently, the reading/decoding process using my LzoThriftSource worked nicely.

Oscar Boykin

unread,
Mar 18, 2014, 10:35:38 PM3/18/14
to cascadi...@googlegroups.com
Sorry I didn't know the answer to this one.

I should have mentioned, opening an issue on the elephantbird github repo would be a good idea in this case.

Glad you worked it out.


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/ffd1dd5e-598c-4a9a-9200-47e5760a97ed%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco
Reply all
Reply to author
Forward
0 new messages