Account Options

  1. Sign in
The old Google Groups will be going away soon, but your browser is incompatible with the new version.
Google Groups Home
« Groups Home
Scalding and LZO Compression
There are currently too many topics in this group that display first. To make this topic appear first, remove this option from another topic.
There was an error processing your request. Please try again.
flag
  4 messages - Collapse all  -  Translate all to Translated (View all originals)
The group you are posting to is a Usenet group. Messages posted to this group will make your email address visible to anyone on the Internet.
Your reply message has not been sent.
Your post was successful
 
From:
To:
Cc:
Followup To:
Add Cc | Add Followup-to | Edit Subject
Subject:
Validation:
For verification purposes please type the characters you see in the picture below or the numbers you hear by clicking the accessibility icon. Listen and type the numbers you hear
 
Kian Wilcox  
View profile  
 More options Jul 11 2012, 9:46 pm
From: Kian Wilcox <kianwil...@gmail.com>
Date: Wed, 11 Jul 2012 18:46:45 -0700 (PDT)
Local: Wed, Jul 11 2012 9:46 pm
Subject: Scalding and LZO Compression

I'm new to Scala and to Scalding, and I wanted to start by saying what a
pleasure it's been to use so far. For simple Tsv's, creating processing
pipelines has been a breeze. However, I've been running into some trouble
with .lzo files in HDFS. In the Getting Started section of the Scalding
wiki, it mentions that you should be able to create source taps from leo
files. Using Ning Lang's work as a template, and Elephant Bird, I've got a
local and an hdfs scheme. I've extended Source to generate a new
LzoDelimitedScheme trait, and then created a case class for an LzoTsv. The
local scheme, plus the extensions and case class are at the bottom. While
this compiles and a job that looks verily similar to the below start up,
they immediately bail with a parse error in both local and hdfs mode. Any
recommendations or pointers to documentation would be most appreciated.

-Kian

class SignupFlowJob(args : Args) extends Job(args) {
  val input = LzoTsv(args("input"), ('user, 'event, 'url, 'origin,
'timestamp))
  // ... irrelevant computations here

}

The error output is as follows:

12/07/11 18:42:57 ERROR stream.TrapHandler: caught Throwable, no trap
available, rethrowing
cascading.tuple.TupleException: unable to read from input identifier:
../scalding-test/0905.dat-1341835200652.lzo
at
cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:127)
at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.j ava:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 908)
at java.lang.Thread.run(Thread.java:680)
Caused by: cascading.tap.TapException: did not parse correct number of
values from input data, expected: 5, got: 1:?LZO
at cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:264)
at cascading.scheme.local.TextDelimited.source(TextDelimited.java:461)
at
cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.j ava:140)
at
cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:120)
... 8 more

-------------------------------------------------

trait LzoDelimitedScheme extends Source {
  //override these as needed:
  val fields = Fields.ALL
  //This is passed directly to cascading where null is interpretted as
string
  val types : Array[Class[_]] = null
  val separator = "\u0001"
  val skipHeader = false
  val writeHeader = false
  //These should not be changed:

  override def localScheme = new CLLzoTextDelimited(fields, skipHeader,
writeHeader, separator, types)

  override def hdfsScheme = {
    new CHLzoTextDelimited(fields, skipHeader, separator,
types).asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_ ,_]]
  }

}

/**
* Tab separted value source, with Lzo compression
*/
case class LzoTsv(p: String, f: Fields = Fields.ALL, sh : Boolean = false,
wh: Boolean = false) extends FixedPathSource(p)
with LzoDelimitedScheme {
override val fields = f
    override val skipHeader = sh
    override val writeHeader = wh
override val separator = sep

}

----------------------------------------------------------
package com.stumbleupon.cascading2.scheme;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

import com.twitter.elephantbird.mapred.input.DeprecatedLzoTextInputFormat;
import com.twitter.elephantbird.mapred.output.DeprecatedLzoTextOutputFormat;

import cascading.flow.FlowProcess;
import cascading.scheme.local.TextDelimited;
import cascading.tap.Tap;
import cascading.tuple.Fields;

/**
 * Copied from Ning Lang's LzoTextDelimited extending hadoop's TextDelimited
 * Scheme for LZO encoded TSV files.
 *
 */
public class LzoTextDelimited extends TextDelimited {

  public LzoTextDelimited(Fields fields, String delimiter) {
    super(fields, delimiter);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String
delimiter) {
    super(fields, skipHeader, delimiter);
  }

  public LzoTextDelimited(Fields fields, String delimiter, Class[] types) {
    super(fields, delimiter, types);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String
delimiter, Class[] types) {
    super(fields, skipHeader, delimiter, types);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, boolean
skipWriteHeader, String delimiter, Class[] types) {
    super(fields, skipHeader, skipWriteHeader, delimiter, types);
  }

  public LzoTextDelimited(Fields fields, String delimiter, String quote,
Class[] types) {
    super(fields, delimiter, quote, types);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String
delimiter,
    String quote, Class[] types) {
    super(fields, skipHeader, delimiter, quote, types);
  }

  public LzoTextDelimited(Fields fields, String delimiter,
    String quote, Class[] types, boolean safe) {
    super(fields, delimiter, quote, types, safe);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String
delimiter,
    String quote, Class[] types, boolean safe) {
    super(fields, skipHeader, delimiter, quote, types, safe);
  }

  public LzoTextDelimited(Fields fields, String delimiter, String quote) {
    super(fields, delimiter, quote);
  }

  public LzoTextDelimited(Fields fields, boolean skipHeader, String
delimiter, String quote) {
    super(fields, skipHeader, delimiter, quote);
  }

  public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf,
RecordReader, OutputCollector> tap, JobConf conf ) {
    conf.setInputFormat(DeprecatedLzoTextInputFormat.class);
  }

  public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf,
RecordReader, OutputCollector> tap, JobConf conf ) {
    conf.setOutputFormat(DeprecatedLzoTextOutputFormat.class);
  }


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Oscar Boykin  
View profile  
 More options Jul 16 2012, 2:40 pm
From: Oscar Boykin <os...@twitter.com>
Date: Mon, 16 Jul 2012 11:40:58 -0700
Local: Mon, Jul 16 2012 2:40 pm
Subject: Re: Scalding and LZO Compression

Sorry for the delay on this.

We use Ning's code internally at twitter:
https://github.com/kevinweil/elephant-bird/blob/master/cascading2/src...

That is exactly what we are using for the scheme.

Then we have some Lzo code which we've been meaning to release, but haven't
yet.  Here is our Lzo stuff:

object HadoopSchemeInstance {
  def apply(scheme : Scheme[_,_,_,_,_]) =

scheme.asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_ ,_]]

}

trait LzoTsv extends DelimitedScheme {
  override def localScheme = { println("This does not work yet"); new
CLTextDelimited(fields, separator, types) }  override def hdfsScheme =
HadoopSchemeInstance(new LzoTextDelimited(fields, separator, types))

}

abstract class DailySuffixSource(prefixTemplate : String, dateRange :
DateRange) extends
  TimePathedSource(prefixTemplate + TimePathedSource.YEAR_MONTH_DAY + "/*",
dateRange, DateOps.UTC)

case class DailySuffixLzoTsv(prefix : String, fs : Fields =
Fields.ALL)(implicit dateRange : DateRange)
  extends DailySuffixSource(prefix, dateRange) with LzoTsv {
  override val fields = fs

}

////////////////////////////////////

And that just works for us.  I didn't delve too deep in your error, but I
wonder if the problem is simpler (are you sure you have the right path,
etc....)

--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco

 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Kian Wilcox  
View profile  
 More options Jul 18 2012, 6:30 pm
From: Kian Wilcox <kianwil...@gmail.com>
Date: Wed, 18 Jul 2012 15:30:03 -0700 (PDT)
Local: Wed, Jul 18 2012 6:30 pm
Subject: Re: Scalding and LZO Compression

Thanks for taking the time to reply. I do believe you are right, and that
the problem is actually simpler. When I manually modify conf-site.xml in
the hadoop configuration to include the following, I am able to read the
lzo compressed files using scalding. However, I'm having an issue figuring
out how to do this on a job by job basis, rather than needing to modify my
configuration for all jobs running on our cluster. I've tried modifying the
config object in Job to include the appropriate keys and values, but it
seems to ignore them. I've also tried passing them as parameters to the
hadoop call itself, but again, they seem to be ignored / overridden
somehow. Is there a simple way to modify this configuration from within
scalding or cascading?

Thanks again,
-Kian

<property>
     <name>io.compression.codecs</name>

 <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compres s.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compress ion.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value>
   </property>

   <property>
     <name>io.compression.codec.lzo.class</name>
     <value>com.hadoop.compression.lzo.LzoCodec</value>
   </property>

...

read more »


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Oscar Boykin  
View profile  
 More options Jul 18 2012, 7:47 pm
From: Oscar Boykin <os...@twitter.com>
Date: Wed, 18 Jul 2012 16:47:50 -0700
Local: Wed, Jul 18 2012 7:47 pm
Subject: Re: Scalding and LZO Compression

Here is what we do internall (sorry I forgot):

==============
In a subclass of Job:

  def buildCodecs(original : String) : String = {
    val neededCodecs = Set(
      "org.apache.hadoop.io.compress.GzipCodec",
      "org.apache.hadoop.io.compress.DefaultCodec",
      "org.apache.hadoop.io.compress.BZip2Codec",
      "com.hadoop.compression.lzo.LzoCodec",
      "com.hadoop.compression.lzo.LzopCodec")

    val providedCodecs = if (original != null) {
      original.split(",").toSet
    } else {
      Set[String]()
    }

    providedCodecs.union(neededCodecs).toList.sortWith{_<_}.mkString(",")
  }

  override def buildFlow(implicit mode : Mode) = {
    // Makes sure that the output lzo files are indexed
    mode match {
      case Hdfs(_, conf) => updateJobConf(conf)
      case HadoopTest(conf, _) => updateJobConf(conf)
      case _ => ()
    }
    super.buildFlow(mode)
  }

  private def updateJobConf(conf : Configuration) {
    conf.setBoolean("elephantbird.lzo.output.index", true)
    // Set up meat-locker to handle TBase:
    val kryoFact = new KryoFactory(conf)
    // Set up custom kryo serializers:
    kryoFact.setHierarchyRegistrations {
      List(new KryoFactory.ClassPair(classOf[TBase[_,_]],
        classOf[TBaseSerializer])).asJava
    }
    // if we run in hadoop local mode, then we still need LZO and other
compression
    // codecs to be set. In hadoop mode (on the cluster) it happens
automatically.
    val ioCodecsKey = "io.compression.codecs"
    conf.set(ioCodecsKey, buildCodecs(conf.get(ioCodecsKey)))

.....

Much of this could be moved into scalding.Job, but there are only so many
hours in the day.

Particularly, the change to allow updateJobConf as a protected method in
Job would be useful (and have it do nothing).  Then users could subclass to
control keys easily.

Sounds like it could be an excellent candidate for a pull request from you!
 :)

...

read more »


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
End of messages
« Back to Discussions « Newer topic     Older topic »