Scalding and LZO Compression

1,166 views
Skip to first unread message

Kian Wilcox

unread,
Jul 11, 2012, 9:46:45 PM7/11/12
to cascadi...@googlegroups.com
I'm new to Scala and to Scalding, and I wanted to start by saying what a pleasure it's been to use so far. For simple Tsv's, creating processing pipelines has been a breeze. However, I've been running into some trouble with .lzo files in HDFS. In the Getting Started section of the Scalding wiki, it mentions that you should be able to create source taps from leo files. Using Ning Lang's work as a template, and Elephant Bird, I've got a local and an hdfs scheme. I've extended Source to generate a new LzoDelimitedScheme trait, and then created a case class for an LzoTsv. The local scheme, plus the extensions and case class are at the bottom. While this compiles and a job that looks verily similar to the below start up, they immediately bail with a parse error in both local and hdfs mode. Any recommendations or pointers to documentation would be most appreciated.

-Kian

class SignupFlowJob(args : Args) extends Job(args) {
  val input = LzoTsv(args("input"), ('user, 'event, 'url, 'origin, 'timestamp))
  // ... irrelevant computations here
}

The error output is as follows:

12/07/11 18:42:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: unable to read from input identifier: ../scalding-test/0905.dat-1341835200652.lzo
at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:127)
at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
Caused by: cascading.tap.TapException: did not parse correct number of values from input data, expected: 5, got: 1:?LZO
at cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:264)
at cascading.scheme.local.TextDelimited.source(TextDelimited.java:461)
at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:140)
at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:120)
... 8 more

-------------------------------------------------


trait LzoDelimitedScheme extends Source {
  //override these as needed:
  val fields = Fields.ALL
  //This is passed directly to cascading where null is interpretted as string
  val types : Array[Class[_]] = null
  val separator = "\u0001"
  val skipHeader = false
  val writeHeader = false
  //These should not be changed:

  override def localScheme = new CLLzoTextDelimited(fields, skipHeader, writeHeader, separator, types)

  override def hdfsScheme = {
    new CHLzoTextDelimited(fields, skipHeader, separator, types).asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
  }
}

/**
* Tab separted value source, with Lzo compression
*/
case class LzoTsv(p: String, f: Fields = Fields.ALL, sh : Boolean = false, wh: Boolean = false) extends FixedPathSource(p)
with LzoDelimitedScheme {
override val fields = f
    override val skipHeader = sh
    override val writeHeader = wh
override val separator = sep
}

----------------------------------------------------------
package com.stumbleupon.cascading2.scheme;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

import com.twitter.elephantbird.mapred.input.DeprecatedLzoTextInputFormat;
import com.twitter.elephantbird.mapred.output.DeprecatedLzoTextOutputFormat;

import cascading.flow.FlowProcess;
import cascading.scheme.local.TextDelimited;
import cascading.tap.Tap;
import cascading.tuple.Fields;

/**
 * Copied from Ning Lang's LzoTextDelimited extending hadoop's TextDelimited
 * Scheme for LZO encoded TSV files.
 *
 */
public class LzoTextDelimited extends TextDelimited {

  public LzoTextDelimited(Fields fields, String delimiter) {
    super(fields, delimiter);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter) {
    super(fields, skipHeader, delimiter);
  }

  public LzoTextDelimited(Fields fields, String delimiter, Class[] types) {
    super(fields, delimiter, types);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter, Class[] types) {
    super(fields, skipHeader, delimiter, types);
  }
  
  public LzoTextDelimited(Fields fields, boolean skipHeader, boolean skipWriteHeader, String delimiter, Class[] types) {
    super(fields, skipHeader, skipWriteHeader, delimiter, types);
  }

  public LzoTextDelimited(Fields fields, String delimiter, String quote, Class[] types) {
    super(fields, delimiter, quote, types);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter,
    String quote, Class[] types) {
    super(fields, skipHeader, delimiter, quote, types);
  }

  public LzoTextDelimited(Fields fields, String delimiter,
    String quote, Class[] types, boolean safe) {
    super(fields, delimiter, quote, types, safe);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter,
    String quote, Class[] types, boolean safe) {
    super(fields, skipHeader, delimiter, quote, types, safe);
  }

  public LzoTextDelimited(Fields fields, String delimiter, String quote) {
    super(fields, delimiter, quote);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter, String quote) {
    super(fields, skipHeader, delimiter, quote);
  }

  public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) {
    conf.setInputFormat(DeprecatedLzoTextInputFormat.class);
  }

  public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf ) {
    conf.setOutputFormat(DeprecatedLzoTextOutputFormat.class);
  }
}

Oscar Boykin

unread,
Jul 16, 2012, 2:40:58 PM7/16/12
to cascadi...@googlegroups.com
Sorry for the delay on this.

We use Ning's code internally at twitter:

That is exactly what we are using for the scheme.

Then we have some Lzo code which we've been meaning to release, but haven't yet.  Here is our Lzo stuff:

object HadoopSchemeInstance {
  def apply(scheme : Scheme[_,_,_,_,_]) =
    scheme.asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}

trait LzoTsv extends DelimitedScheme {
  override def localScheme = { println("This does not work yet"); new CLTextDelimited(fields, separator, types) }  override def hdfsScheme = HadoopSchemeInstance(new LzoTextDelimited(fields, separator, types))
}

abstract class DailySuffixSource(prefixTemplate : String, dateRange : DateRange) extends
  TimePathedSource(prefixTemplate + TimePathedSource.YEAR_MONTH_DAY + "/*", dateRange, DateOps.UTC)

case class DailySuffixLzoTsv(prefix : String, fs : Fields = Fields.ALL)(implicit dateRange : DateRange)
  extends DailySuffixSource(prefix, dateRange) with LzoTsv {
  override val fields = fs
}

////////////////////////////////////

And that just works for us.  I didn't delve too deep in your error, but I wonder if the problem is simpler (are you sure you have the right path, etc....)

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/euwC3xLimfQJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.



--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco

Kian Wilcox

unread,
Jul 18, 2012, 6:30:03 PM7/18/12
to cascadi...@googlegroups.com
Thanks for taking the time to reply. I do believe you are right, and that the problem is actually simpler. When I manually modify conf-site.xml in the hadoop configuration to include the following, I am able to read the lzo compressed files using scalding. However, I'm having an issue figuring out how to do this on a job by job basis, rather than needing to modify my configuration for all jobs running on our cluster. I've tried modifying the config object in Job to include the appropriate keys and values, but it seems to ignore them. I've also tried passing them as parameters to the hadoop call itself, but again, they seem to be ignored / overridden somehow. Is there a simple way to modify this configuration from within scalding or cascading?

Thanks again,
-Kian

<property> 
     <name>io.compression.codecs</name> 
     <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value> 
   </property> 

   <property> 
     <name>io.compression.codec.lzo.class</name> 
     <value>com.hadoop.compression.lzo.LzoCodec</value> 
   </property>
To post to this group, send email to cascading-user@googlegroups.com.
To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Oscar Boykin

unread,
Jul 18, 2012, 7:47:50 PM7/18/12
to cascadi...@googlegroups.com
Here is what we do internall (sorry I forgot):

==============
In a subclass of Job:

  def buildCodecs(original : String) : String = {
    val neededCodecs = Set(
      "org.apache.hadoop.io.compress.GzipCodec",
      "org.apache.hadoop.io.compress.DefaultCodec",
      "org.apache.hadoop.io.compress.BZip2Codec",
      "com.hadoop.compression.lzo.LzoCodec",
      "com.hadoop.compression.lzo.LzopCodec")

    val providedCodecs = if (original != null) {
      original.split(",").toSet
    } else {
      Set[String]()
    }

    providedCodecs.union(neededCodecs).toList.sortWith{_<_}.mkString(",")
  }

  override def buildFlow(implicit mode : Mode) = {
    // Makes sure that the output lzo files are indexed
    mode match {
      case Hdfs(_, conf) => updateJobConf(conf)
      case HadoopTest(conf, _) => updateJobConf(conf)
      case _ => ()
    }
    super.buildFlow(mode)
  }

  private def updateJobConf(conf : Configuration) {
    conf.setBoolean("elephantbird.lzo.output.index", true)
    // Set up meat-locker to handle TBase:
    val kryoFact = new KryoFactory(conf)
    // Set up custom kryo serializers:
    kryoFact.setHierarchyRegistrations {
      List(new KryoFactory.ClassPair(classOf[TBase[_,_]],
        classOf[TBaseSerializer])).asJava
    }
    // if we run in hadoop local mode, then we still need LZO and other compression
    // codecs to be set. In hadoop mode (on the cluster) it happens automatically.
    val ioCodecsKey = "io.compression.codecs"
    conf.set(ioCodecsKey, buildCodecs(conf.get(ioCodecsKey)))

.....

Much of this could be moved into scalding.Job, but there are only so many hours in the day.

Particularly, the change to allow updateJobConf as a protected method in Job would be useful (and have it do nothing).  Then users could subclass to control keys easily.

Sounds like it could be an excellent candidate for a pull request from you!  :)

To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/eugC82iaZC8J.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
Reply all
Reply to author
Forward
0 new messages