Splittable LZO Compression

366 views
Skip to first unread message

Michael Schmitz

unread,
Apr 19, 2012, 4:37:10 PM4/19/12
to scoobi...@googlegroups.com
Hi, I'm trying to use Scoobi to read and write LZO files. Scoobi
always uses TextInputFormat. Newer versions of Hadoop handle the
compression format transparently, but they don't use the LZO index
file for splittability. Thus there is one mapper per file which is a
major loss in parallelization. Is there any way to read *and split*
LZO in Scoobi?

Also, is it possible to output compressed file formats in Scoobi?

Peace. Michael

Ben Lever

unread,
May 6, 2012, 8:45:11 PM5/6/12
to scoobi...@googlegroups.com
Hi Michael,

What do you mean by "newer versions of Hadoop handle the compression format transparently"? Which version are you using?

Cheers,
Ben.

Michael Schmitz

unread,
May 6, 2012, 9:52:54 PM5/6/12
to scoobi...@googlegroups.com
1.0.2--I wasn't aware that earlier versions (0.20.203) would decompress LZO when TextInputFormat was used.

Ben Lever

unread,
May 6, 2012, 10:25:53 PM5/6/12
to scoobi...@googlegroups.com
Gotcha.

One way to do it would be to try and use the newer TextInputFormat in your own DataSource. Take a look at http://nicta.github.com/scoobi/guide/Input%20and%20Output.html#Custom+sources+and+sinks

Cheers,
Ben.


On Monday, May 7, 2012 11:52:54 AM UTC+10, Michael Schmitz wrote:
1.0.2--I wasn't aware that earlier versions (0.20.203) would decompress LZO when TextInputFormat was used.

Michael Schmitz

unread,
May 7, 2012, 11:43:29 AM5/7/12
to scoobi...@googlegroups.com
Ahh--super cool.  Sorry I missed this before.

When I have time (yeah right) I'll write one for LzoTextInputFormat and post a gist to this thread.

Peace.  Michael

schmmd

unread,
May 12, 2013, 2:22:39 PM5/12/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Hmmm... I'm running into this issue again after never really resolving it in the first place.  It worked to copy TextInput to LzoTextInput and change the inputFormat to LzoTextInputFormat.  Cool!  It'd be nice if this were easier somehow--maybe I could specify a compression input format with fromTextFile?

Peace.  Michael

Ben Lever

unread,
May 12, 2013, 8:35:00 PM5/12/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Hi Michael,

I know in 0.6 Eric Torreborre did a bunch of work around supporting compression. Eric - can you point Michael to the relevant APIs to see if that now solves his problems?

Cheers,
Ben.

Eric Torreborre

unread,
May 12, 2013, 8:56:36 PM5/12/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Hi Michael,

In Scoobi 0.7.0-SNAPSHOT you can output compressed formats with the following syntax:

DList(1, 2, 3).toTextFile("path").compressWith(new Lz4Codec)

For the input, I've just given a bit of flexibility to the API so now you can write:

fromTextSource(new TextSource(Seq(paths), inputFormat = classOf[LzoTextInputFormat])

Does that work for you?

Cheers,

Eric.


On Monday, May 13, 2013 4:22:39 AM UTC+10, schmmd wrote:

Michael Schmitz

unread,
May 12, 2013, 11:46:11 PM5/12/13
to scoobi...@googlegroups.com

Cool! The latter is exactly what I was hoping for! I'll look forward to 0.7.

--
You received this message because you are subscribed to the Google Groups "scoobi-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scoobi-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

schmmd

unread,
May 22, 2013, 5:32:21 PM5/22/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
I have not tested this out yet, but to compile I need to use:

TextInput.fromTextSource(new TextSource(Seq(inputPath),  inputFormat = classOf[LzoTextInputFormat].asInstanceOf[Class[org.apache.hadoop.mapreduce.lib.input.TextInputFormat]]))

The cast is necessary since "TextInputFormat extends FileInputFormat<LongWritable, Text>"--not "TextInputFormat".

Peace.  Michael

Eric Torreborre

unread,
May 23, 2013, 9:17:14 PM5/23/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Good point, I fixed this method to accept now anything that is "_ <: FileInputFormat[LongWritable, Text]".

E.

schmmd

unread,
Jun 11, 2013, 7:08:49 PM6/11/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
This works great when there is a single MR job.  However, when I have multiple MR jobs from a single Scoobi job fails on the second MR job.

java.io.EOFException: Premature EOF from inputStream
	at com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:75)
	at com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:114)
	at com.hadoop.compression.lzo.LzopInputStream.<init>(LzopInputStream.java:54)
	at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:83)
	at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1578)
	at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1486)
	at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1475)
	at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1470)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.initialize(SequenceFileRecordReader.java:50)
	at com.nicta.scoobi.impl.mapreducer.ChannelRecordReader.initialize(ChannelsInputFormat.scala:208)
	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:522)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:416)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)

I think this is because the intermediate data isn't LZO compressed but it's trying to read it as if it were LZO compressed.  Hopefully I'll have more information soon.

Peace.  Michael

Eric Torreborre

unread,
Jun 11, 2013, 10:07:45 PM6/11/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
This is plausible, I need to investigate that with a short test case.

schmmd

unread,
Jun 12, 2013, 1:12:16 PM6/12/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
I'm pretty sure this is the issue.  Here is my bad test case and small sample file.


Peace.  Michael

schmmd

unread,
Jun 12, 2013, 2:34:20 PM6/12/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
This might be an issue with my particular Hadoop configuration.  I have compression set to true in my Hadoop configuration which might be interacting with multi-MR jobs in a bad way.

Peace.  Michael

schmmd

unread,
Jun 12, 2013, 5:07:21 PM6/12/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Yes, I'm quite sure the issue is that if mapred.output.compress=true in mapred-site.xml then multi-MR scoobi jobs fail with the aforementioned exception.  We have mapred.output.compress set to true to make users of the cluster use compression and save resources.

Peace.  Michael

Eric Torreborre

unread,
Jun 17, 2013, 1:26:06 AM6/17/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Hi Michael,

Can you please send me your configuration properties for the second job?

There is a strange interaction between the configuration parameters for the first job and the ones used for the second job. In the stacktrace you provide we try to read a sequence file with a codec that corresponds to how the data was read for the first job. I don't quite get which parameters are determining this and how they are created/transferred. Also since you mention that you have a set of user-defined parameters for compression (like mapred.output.compress) I'd like to check if there are others as well, like 'mapred.output.compression.codec'.

In any case I did a change which I'd like you to test (in 0.7.0-RC3-cdh4-SNAPSHOT). With this change you have to explicitly state that you want your output files to be compressed with the Scoobi API:

list.toTextFile(path(resultDir)).compressWith(new GzipCodec)

With this API we can make sure that all the configuration properties for compression are set (compress, codec, compression.type) and that they are only set where expected.

Michael Schmitz

unread,
Jun 17, 2013, 12:27:50 PM6/17/13
to scoobi...@googlegroups.com
Hi Eric, I don't have the configuration properties for the second job anymore.  I'll have to re-run this experiment.  mapred.output.compression.codec is definately set.  I attached my mapred-site.xml in case this gives you need.  It's pretty vanilla except for the compression settings.

I'd test your change, but I don't use the cdh4 code.

Peace.  Michael
mapred-site.xml

Eric Torreborre

unread,
Jun 17, 2013, 7:35:38 PM6/17/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
The change should have been republished for CDH3 as well.

Actually I'd be glad if you could test it because I did some changes in the way we create the CDH3 version from the CDH4 to simplify our build but I couldn't test the changes on our cluster.

Thanks.

Eric Torreborre

unread,
Jun 17, 2013, 7:38:42 PM6/17/13
to scoobi...@googlegroups.com, mic...@schmitztech.com
Yes I think that your configuration is what was causing the issue with multiple map-reduce jobs since those settings where used for any output files.
Reply all
Reply to author
Forward
0 new messages