I'm new to Scala and to Scalding, and I wanted to start by saying what a pleasure it's been to use so far. For simple Tsv's, creating processing pipelines has been a breeze. However, I've been running into some trouble with .lzo files in HDFS. In the Getting Started section of the Scalding wiki, it mentions that you should be able to create source taps from leo files. Using Ning Lang's work as a template, and Elephant Bird, I've got a local and an hdfs scheme. I've extended Source to generate a new LzoDelimitedScheme trait, and then created a case class for an LzoTsv. The local scheme, plus the extensions and case class are at the bottom. While this compiles and a job that looks verily similar to the below start up, they immediately bail with a parse error in both local and hdfs mode. Any recommendations or pointers to documentation would be most appreciated.
-Kian
class SignupFlowJob(args : Args) extends Job(args) { val input = LzoTsv(args("input"), ('user, 'event, 'url, 'origin, 'timestamp)) // ... irrelevant computations here
}
The error output is as follows:
12/07/11 18:42:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing cascading.tuple.TupleException: unable to read from input identifier: ../scalding-test/0905.dat-1341835200652.lzo at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:127) at cascading.flow.stream.SourceStage.map(SourceStage.java:76) at cascading.flow.stream.SourceStage.call(SourceStage.java:53) at cascading.flow.stream.SourceStage.call(SourceStage.java:38) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.j ava:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 908) at java.lang.Thread.run(Thread.java:680) Caused by: cascading.tap.TapException: did not parse correct number of values from input data, expected: 5, got: 1:?LZO at cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:264) at cascading.scheme.local.TextDelimited.source(TextDelimited.java:461) at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.j ava:140) at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:120) ... 8 more
-------------------------------------------------
trait LzoDelimitedScheme extends Source { //override these as needed: val fields = Fields.ALL //This is passed directly to cascading where null is interpretted as string val types : Array[Class[_]] = null val separator = "\u0001" val skipHeader = false val writeHeader = false //These should not be changed:
override def localScheme = new CLLzoTextDelimited(fields, skipHeader, writeHeader, separator, types)
/** * Tab separted value source, with Lzo compression */ case class LzoTsv(p: String, f: Fields = Fields.ALL, sh : Boolean = false, wh: Boolean = false) extends FixedPathSource(p) with LzoDelimitedScheme { override val fields = f override val skipHeader = sh override val writeHeader = wh override val separator = sep
/** * Copied from Ning Lang's LzoTextDelimited extending hadoop's TextDelimited * Scheme for LZO encoded TSV files. * */ public class LzoTextDelimited extends TextDelimited {
public LzoTextDelimited(Fields fields, String delimiter) { super(fields, delimiter); }
case class DailySuffixLzoTsv(prefix : String, fs : Fields =
Fields.ALL)(implicit dateRange : DateRange)
extends DailySuffixSource(prefix, dateRange) with LzoTsv {
override val fields = fs
}
////////////////////////////////////
And that just works for us. I didn't delve too deep in your error, but I
wonder if the problem is simpler (are you sure you have the right path,
etc....)
On Wed, Jul 11, 2012 at 6:46 PM, Kian Wilcox <kianwil...@gmail.com> wrote:
> I'm new to Scala and to Scalding, and I wanted to start by saying what a
> pleasure it's been to use so far. For simple Tsv's, creating processing
> pipelines has been a breeze. However, I've been running into some trouble
> with .lzo files in HDFS. In the Getting Started section of the Scalding
> wiki, it mentions that you should be able to create source taps from leo
> files. Using Ning Lang's work as a template, and Elephant Bird, I've got a
> local and an hdfs scheme. I've extended Source to generate a new
> LzoDelimitedScheme trait, and then created a case class for an LzoTsv. The
> local scheme, plus the extensions and case class are at the bottom. While
> this compiles and a job that looks verily similar to the below start up,
> they immediately bail with a parse error in both local and hdfs mode. Any
> recommendations or pointers to documentation would be most appreciated.
> -Kian
> class SignupFlowJob(args : Args) extends Job(args) {
> val input = LzoTsv(args("input"), ('user, 'event, 'url, 'origin,
> 'timestamp))
> // ... irrelevant computations here
> }
> The error output is as follows:
> 12/07/11 18:42:57 ERROR stream.TrapHandler: caught Throwable, no trap
> available, rethrowing
> cascading.tuple.TupleException: unable to read from input identifier:
> ../scalding-test/0905.dat-1341835200652.lzo
> at
> cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:127)
> at cascading.flow.stream.SourceStage.map(SourceStage.java:76)
> at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
> at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.j ava:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 908)
> at java.lang.Thread.run(Thread.java:680)
> Caused by: cascading.tap.TapException: did not parse correct number of
> values from input data, expected: 5, got: 1:?LZO
> at
> cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:264)
> at cascading.scheme.local.TextDelimited.source(TextDelimited.java:461)
> at
> cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.j ava:140)
> at
> cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:120)
> ... 8 more
> trait LzoDelimitedScheme extends Source {
> //override these as needed:
> val fields = Fields.ALL
> //This is passed directly to cascading where null is interpretted as
> string
> val types : Array[Class[_]] = null
> val separator = "\u0001"
> val skipHeader = false
> val writeHeader = false
> //These should not be changed:
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To view this discussion on the web visit
> https://groups.google.com/d/msg/cascading-user/-/euwC3xLimfQJ.
> To post to this group, send email to cascading-user@googlegroups.com.
> To unsubscribe from this group, send email to
> cascading-user+unsubscribe@googlegroups.com.
> For more options, visit this group at
> http://groups.google.com/group/cascading-user?hl=en.
Thanks for taking the time to reply. I do believe you are right, and that the problem is actually simpler. When I manually modify conf-site.xml in the hadoop configuration to include the following, I am able to read the lzo compressed files using scalding. However, I'm having an issue figuring out how to do this on a job by job basis, rather than needing to modify my configuration for all jobs running on our cluster. I've tried modifying the config object in Job to include the appropriate keys and values, but it seems to ignore them. I've also tried passing them as parameters to the hadoop call itself, but again, they seem to be ignored / overridden somehow. Is there a simple way to modify this configuration from within scalding or cascading?
> case class DailySuffixLzoTsv(prefix : String, fs : Fields = > Fields.ALL)(implicit dateRange : DateRange) > extends DailySuffixSource(prefix, dateRange) with LzoTsv { > override val fields = fs > }
> ////////////////////////////////////
> And that just works for us. I didn't delve too deep in your error, but I > wonder if the problem is simpler (are you sure you have the right path, > etc....)
> On Wed, Jul 11, 2012 at 6:46 PM, Kian Wilcox <kianwil...@gmail.com> wrote:
>> I'm new to Scala and to Scalding, and I wanted to start by saying what a >> pleasure it's been to use so far. For simple Tsv's, creating processing >> pipelines has been a breeze. However, I've been running into some trouble >> with .lzo files in HDFS. In the Getting Started section of the Scalding >> wiki, it mentions that you should be able to create source taps from leo >> files. Using Ning Lang's work as a template, and Elephant Bird, I've got a >> local and an hdfs scheme. I've extended Source to generate a new >> LzoDelimitedScheme trait, and then created a case class for an LzoTsv. The >> local scheme, plus the extensions and case class are at the bottom. While >> this compiles and a job that looks verily similar to the below start up, >> they immediately bail with a parse error in both local and hdfs mode. Any >> recommendations or pointers to documentation would be most appreciated.
>> -Kian
>> class SignupFlowJob(args : Args) extends Job(args) { >> val input = LzoTsv(args("input"), ('user, 'event, 'url, 'origin, >> 'timestamp)) >> // ... irrelevant computations here >> }
>> The error output is as follows:
>> 12/07/11 18:42:57 ERROR stream.TrapHandler: caught Throwable, no trap >> available, rethrowing >> cascading.tuple.TupleException: unable to read from input identifier: >> ../scalding-test/0905.dat-1341835200652.lzo >> at >> cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:127) >> at cascading.flow.stream.SourceStage.map(SourceStage.java:76) >> at cascading.flow.stream.SourceStage.call(SourceStage.java:53) >> at cascading.flow.stream.SourceStage.call(SourceStage.java:38) >> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.j ava:886) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 908) >> at java.lang.Thread.run(Thread.java:680) >> Caused by: cascading.tap.TapException: did not parse correct number of >> values from input data, expected: 5, got: 1:?LZO >> at >> cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:264) >> at cascading.scheme.local.TextDelimited.source(TextDelimited.java:461) >> at >> cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.j ava:140) >> at >> cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.j ava:120) >> ... 8 more
>> trait LzoDelimitedScheme extends Source { >> //override these as needed: >> val fields = Fields.ALL >> //This is passed directly to cascading where null is interpretted as >> string >> val types : Array[Class[_]] = null >> val separator = "\u0001" >> val skipHeader = false >> val writeHeader = false >> //These should not be changed:
>> -- >> You received this message because you are subscribed to the Google Groups >> "cascading-user" group. >> To view this discussion on the web visit >> https://groups.google.com/d/msg/cascading-user/-/euwC3xLimfQJ. >> To post to this group, send email to cascading-user@googlegroups.com. >> To unsubscribe from this group, send email to >> cascading-user+unsubscribe@googlegroups.com. >> For more options, visit this group at >> http://groups.google.com/group/cascading-user?hl=en.
override def buildFlow(implicit mode : Mode) = {
// Makes sure that the output lzo files are indexed
mode match {
case Hdfs(_, conf) => updateJobConf(conf)
case HadoopTest(conf, _) => updateJobConf(conf)
case _ => ()
}
super.buildFlow(mode)
}
private def updateJobConf(conf : Configuration) {
conf.setBoolean("elephantbird.lzo.output.index", true)
// Set up meat-locker to handle TBase:
val kryoFact = new KryoFactory(conf)
// Set up custom kryo serializers:
kryoFact.setHierarchyRegistrations {
List(new KryoFactory.ClassPair(classOf[TBase[_,_]],
classOf[TBaseSerializer])).asJava
}
// if we run in hadoop local mode, then we still need LZO and other
compression
// codecs to be set. In hadoop mode (on the cluster) it happens
automatically.
val ioCodecsKey = "io.compression.codecs"
conf.set(ioCodecsKey, buildCodecs(conf.get(ioCodecsKey)))
.....
Much of this could be moved into scalding.Job, but there are only so many
hours in the day.
Particularly, the change to allow updateJobConf as a protected method in
Job would be useful (and have it do nothing). Then users could subclass to
control keys easily.
Sounds like it could be an excellent candidate for a pull request from you!
:)
On Wed, Jul 18, 2012 at 3:30 PM, Kian Wilcox <kianwil...@gmail.com> wrote:
> Thanks for taking the time to reply. I do believe you are right, and that
> the problem is actually simpler. When I manually modify conf-site.xml in
> the hadoop configuration to include the following, I am able to read the
> lzo compressed files using scalding. However, I'm having an issue figuring
> out how to do this on a job by job basis, rather than needing to modify my
> configuration for all jobs running on our cluster. I've tried modifying the
> config object in Job to include the appropriate keys and values, but it
> seems to ignore them. I've also tried passing them as parameters to the
> hadoop call itself, but again, they seem to be ignored / overridden
> somehow. Is there a simple way to modify this configuration from within
> scalding or cascading?
>> case class DailySuffixLzoTsv(prefix : String, fs : Fields =
>> Fields.ALL)(implicit dateRange : DateRange)
>> extends DailySuffixSource(prefix, dateRange) with LzoTsv {
>> override val fields = fs
>> }
>> //////////////////////////////**//////
>> And that just works for us. I didn't delve too deep in your error, but I
>> wonder if the problem is simpler (are you sure you have the right path,
>> etc....)
>> On Wed, Jul 11, 2012 at 6:46 PM, Kian Wilcox <kianwil...@gmail.com>wrote:
>>> I'm new to Scala and to Scalding, and I wanted to start by saying what a
>>> pleasure it's been to use so far. For simple Tsv's, creating processing
>>> pipelines has been a breeze. However, I've been running into some trouble
>>> with .lzo files in HDFS. In the Getting Started section of the Scalding
>>> wiki, it mentions that you should be able to create source taps from leo
>>> files. Using Ning Lang's work as a template, and Elephant Bird, I've got a
>>> local and an hdfs scheme. I've extended Source to generate a new
>>> LzoDelimitedScheme trait, and then created a case class for an LzoTsv. The
>>> local scheme, plus the extensions and case class are at the bottom. While
>>> this compiles and a job that looks verily similar to the below start up,
>>> they immediately bail with a parse error in both local and hdfs mode. Any
>>> recommendations or pointers to documentation would be most appreciated.
>>> -Kian
>>> class SignupFlowJob(args : Args) extends Job(args) {
>>> val input = LzoTsv(args("input"), ('user, 'event, 'url, 'origin,
>>> 'timestamp))
>>> // ... irrelevant computations here
>>> }
>>> The error output is as follows:
>>> 12/07/11 18:42:57 ERROR stream.TrapHandler: caught Throwable, no trap
>>> available, rethrowing
>>> cascading.tuple.**TupleException: unable to read from input identifier:
>>> ../scalding-test/0905.dat-**1341835200652.lzo
>>> at cascading.tuple.**TupleEntrySchemeIterator.**hasNext(**
>>> TupleEntrySchemeIterator.java:**127)
>>> at cascading.flow.stream.**SourceStage.map(SourceStage.**java:76)
>>> at cascading.flow.stream.**SourceStage.call(SourceStage.**java:53)
>>> at cascading.flow.stream.**SourceStage.call(SourceStage.**java:38)
>>> at java.util.concurrent.**FutureTask$Sync.innerRun(**
>>> FutureTask.java:303)
>>> at java.util.concurrent.**FutureTask.run(FutureTask.**java:138)
>>> at java.util.concurrent.**ThreadPoolExecutor$Worker.**
>>> runTask(ThreadPoolExecutor.**java:886)
>>> at java.util.concurrent.**ThreadPoolExecutor$Worker.run(**
>>> ThreadPoolExecutor.java:908)
>>> at java.lang.Thread.run(Thread.**java:680)
>>> Caused by: cascading.tap.TapException: did not parse correct number of
>>> values from input data, expected: 5, got: 1:?LZO
>>> at cascading.scheme.util.**DelimitedParser.parseLine(**
>>> DelimitedParser.java:264)
>>> at cascading.scheme.local.**TextDelimited.source(**
>>> TextDelimited.java:461)
>>> at cascading.tuple.**TupleEntrySchemeIterator.**getNext(**
>>> TupleEntrySchemeIterator.java:**140)
>>> at cascading.tuple.**TupleEntrySchemeIterator.**hasNext(**
>>> TupleEntrySchemeIterator.java:**120)
>>> ... 8 more
>>> trait LzoDelimitedScheme extends Source {
>>> //override these as needed:
>>> val fields = Fields.ALL
>>> //This is passed directly to cascading where null is interpretted as
>>> string
>>> val types : Array[Class[_]] = null
>>> val separator = "\u0001"
>>> val skipHeader = false
>>> val writeHeader = false
>>> //These should not be changed: