Scalding 0.8.6 WritableSequenceFile Compression

50 views
Skip to first unread message

Asher

unread,
Apr 16, 2015, 10:18:54 AM4/16/15
to cascadi...@googlegroups.com
Using Scalding 0.8.6, and writing out records to a WritableSequenceFile like this:

.write(WritableSequenceFile[Text, TypedProtobufWritable[ReportRow.Row]](path, ('key, 'value)))

I would like to use Snappy Compression.  One would assume the way to go about it would be to override the config method:

override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
super.config ++ Map (
"mapreduce.output.fileoutputformat.compress" -> "true",
"mapreduce.output.fileoutputformat.compress.codec" -> "org.apache.hadoop.io.compress.SnappyCodec",
"mapreduce.output.fileoutputformat.compress.type" -> "BLOCK"
)
}

These values are set in the job configuration, but it appears the output format ignores them.
Any suggestions?  This is in production, so upgrading the Scalding version is not an option at this time.

--Asher

Gera Shegalov

unread,
Apr 16, 2015, 11:21:10 AM4/16/15
to cascadi...@googlegroups.com
What is your Hadoop version?
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/8113e1a9-b8d9-4377-87be-0f56af2addb0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
Sent from Gmail Mobile

Asher

unread,
Apr 16, 2015, 11:28:37 AM4/16/15
to cascadi...@googlegroups.com
2.0.0-cdh4.0.1.  I've tried w/ both mapreduce and mapred prefixed settings.


On Thursday, April 16, 2015 at 11:21:10 AM UTC-4, Gera Shegalov wrote:
What is your Hadoop version?

On Thursday, April 16, 2015, Asher <ash...@gmail.com> wrote:
Using Scalding 0.8.6, and writing out records to a WritableSequenceFile like this:

.write(WritableSequenceFile[Text, TypedProtobufWritable[ReportRow.Row]](path, ('key, 'value)))

I would like to use Snappy Compression.  One would assume the way to go about it would be to override the config method:

override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
super.config ++ Map (
"mapreduce.output.fileoutputformat.compress" -> "true",
"mapreduce.output.fileoutputformat.compress.codec" -> "org.apache.hadoop.io.compress.SnappyCodec",
"mapreduce.output.fileoutputformat.compress.type" -> "BLOCK"
)
}

These values are set in the job configuration, but it appears the output format ignores them.
Any suggestions?  This is in production, so upgrading the Scalding version is not an option at this time.

--Asher

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.

Asher

unread,
Apr 16, 2015, 1:23:03 PM4/16/15
to cascadi...@googlegroups.com
I've also tried overriding the updateConf method as well, but it is also ignored.

override def updateConf(c: Configuration): Unit = {
super.updateConf(c)
c.setBoolean("mapreduce.output.fileoutputformat.compress",true)
c.setBoolean("mapred.output.fileoutputformat.compress",true)
c.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
c.set("mapred.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
c.set("mapreduce.output.fileoutputformat.compress.type" , "BLOCK")
c.set("mapred.output.fileoutputformat.compress.type" , "BLOCK")
}

Oscar Boykin

unread,
Apr 16, 2015, 1:49:49 PM4/16/15
to cascadi...@googlegroups.com
If these keys make it to the job tracker configuration, the issue is in the Tap we are using for sink:


which uses this:

I don't see how we are using it incorrectly. Perhaps someone with a similar issue can comment (at Twitter we really only use elephant-bird for lzo-thrift output or we use parquet).

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Asher Devuyst

unread,
Apr 16, 2015, 2:44:53 PM4/16/15
to cascadi...@googlegroups.com
They are making it to the config.  For example when the job is launched, I can see that they are indeed set...
mapred.output.fileoutputformat.compresstrue
mapred.output.fileoutputformat.compress.typeBLOCK
mapred.output.fileoutputformat.compress.codecorg.apache.hadoop.io.compress.SnappyCodec
mapred.map.output.compression.codecorg.apache.hadoop.io.compress.SnappyCodec
mapred.output.compression.typeBLOCK
mapred.output.compression.codecorg.apache.hadoop.io.compress.DefaultCodec


The last one looked a little suspect and the result was that it did not compress the output.  
I made a few changes, specifically to override the .DefaulCodec above and to also compress the map output and that finally got it working.

This was the set of overrides that got it working:
override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
super.config ++ Map (
  // JOB OUTPUT
"mapred.output.fileoutputformat.compress" -> "true",
"mapred.output.fileoutputformat.compress.codec" -> "org.apache.hadoop.io.compress.SnappyCodec",
"mapred.output.fileoutputformat.compress.type" -> "BLOCK",
"mapred.output.compression.type" -> "BLOCK",
"mapred.output.compress" -> "true",
"mapred.output.compression.codec" -> "org.apache.hadoop.io.compress.SnappyCodec",
// MAP OUTPUT
"mapred.map.output.compress" -> "true",
"mapred.map.output.compress.codec" -> "org.apache.hadoop.io.compress.SnappyCodec"
)
}
override def updateConf(c: Configuration): Unit = {
super.updateConf(c)
  c.setBoolean("mapred.output.fileoutputformat.compress",true)
c.setBoolean("mapred.output.compress",true)
c.setBoolean("mapred.map.output.compress", true)

c.set("mapred.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
  c.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")
c.set("mapred.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")

c.set("mapred.output.fileoutputformat.compress.type" , "BLOCK")
  c.set("mapred.output.compression.type" , "BLOCK")
}

Thanks for taking the time to look at this.

--Asher


--
You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/QXWNkx980ds/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

Gera Shegalov

unread,
Apr 16, 2015, 5:24:59 PM4/16/15
to cascading-user
Because it's a JT, you use MRv1 (branch-1), make sure this is set 

mapred.output.compres=true
mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
 


For more options, visit https://groups.google.com/d/optout.



--
@gerashegalov
Reply all
Reply to author
Forward
0 new messages