SeqSchema for anything with WireFormat

95 views
Skip to first unread message

Elliot Chow

unread,
Apr 22, 2013, 3:56:01 PM4/22/13
to scoob...@googlegroups.com
Hi,

I was hoping to write a implicit that makes a SeqSchema for anything with a W ireFormat.  I submitted my code here, but there now seems to be some issues.  It was working before the refactoring of the sequence file API, but I have not dug into it further to see why it fails now.  Any help or alternatives would be greatly appreciated.

Thanks.

Eric Springer

unread,
Apr 22, 2013, 5:30:28 PM4/22/13
to scoob...@googlegroups.com
Got a link to the branch you're working on? (The PR seems to be deaded)



--
You received this message because you are subscribed to the Google Groups "scoobi-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scoobi-dev+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Elliot Chow

unread,
Apr 22, 2013, 11:09:22 PM4/22/13
to scoob...@googlegroups.com
link to pull request (again):
https://github.com/NICTA/scoobi/pull/236

In my fork (This hasn't been brought up to the current snapshot, so I believe it should still work).
https://github.com/ellchow/scoobi/tree/anyWFSeqSchema


Just in case you still cannot see my pull request, here are some notes about the errors I am seeing now:

I'm not sure what changed, but this seems to have stopped working since the API for sequence IO was modified.
For this code

    import com.nicta.scoobi.Scoobi._

    object HelloWorld extends ScoobiApp{
      def run() = {
        val x = DList((1L, 2L), (2L, 3L),(3L, 4L)).toSequenceFile("checkpoint").checkpoint
        val y = x.map(_._2)
        persist(y.toTextFile("second"))
      }
    }

[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(Unknown Source)
    at scala.Tuple2._2$mcJ$sp(Tuple2.scala:19)
    at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37)
    at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37)
    at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105)
    at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105)
    at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:79)
    at com.nicta.scoobi.core.DoFn$class.process(EnvDoFn.scala:55)
    at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77)
    at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77)
    at com.nicta.scoobi.core.EnvDoFn$class.processFunction(EnvDoFn.scala:37)
    at com.nicta.scoobi.core.BasicDoFn$$anon$1.processFunction(EnvDoFn.scala:77)
    at com.nicta.scoobi.impl.plan.comp.ParallelDo.map(ProcessNode.scala:94)
    at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36)
    at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter.map(VectorEmitterWriter.scala:36)
    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeMapper$1(InputChannel.scala:194)
    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:182)
    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:181)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeNext$1(InputChannel.scala:181)
    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:178)
    at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

Also, this fails

    import com.nicta.scoobi.Scoobi._

    import java.io._
    import collection.mutable
    import com.nicta.scoobi.core.WireFormat
    import org.apache.hadoop.io.BytesWritable

    object HelloWorld extends ScoobiApp{

     implicit def anyWFSeqSchema[A : WireFormat]: SeqSchema[A] = new SeqSchema[A] {
        type SeqType = BytesWritable

        val b = mutable.ArrayBuffer[Byte]().mapResult(_.toArray)

        def toWritable(a: A) = {
          val bs = new ByteArrayOutputStream
          implicitly[WireFormat[A]].toWire(a, new DataOutputStream(bs))
          new BytesWritable(bs.toByteArray)
        }
        def fromWritable(xs: BytesWritable): A = {
          b.clear()
          xs.getBytes.take(xs.getLength).foreach { x => b += x }
          val bArr = b.result()

          val bais = new ByteArrayInputStream(bArr)
          implicitly[WireFormat[A]].fromWire(new DataInputStream(bais))
        }
        val mf: Manifest[SeqType] = implicitly
      }

      def run() = {
        case class Foo(val value: Int)
        implicit val FooFmt = mkCaseWireFormat(Foo, Foo unapply _)
        val x = DList(Foo(1), Foo(2)).valueToSequenceFile("checkpoint").checkpoint
        val y = x.map(e => Foo(e.value + 1))
        persist(y.toTextFile("plusone"))
      }
    }

[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
    at HelloWorld$$anon$1.fromWritable(HelloWorld.scala:10)
    at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:111)
    at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:110)
    at com.nicta.scoobi.core.InputConverter$class.asValue(DataSource.scala:51)
    at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.asValue(SequenceInput.scala:110)
    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:177)
    at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

Eric Torreborre

unread,
Apr 23, 2013, 2:13:26 AM4/23/13
to scoob...@googlegroups.com
Hi Elliot,

I fixed the issue you found (it was the remains of the implementation of the checkpoint feature) and I incorporated your code.

Thanks for trying it on your side,

E.

Elliot Chow

unread,
Apr 23, 2013, 9:34:00 PM4/23/13
to scoob...@googlegroups.com
Thanks. 

There just one weird thing that still seems to fail, but maybe I am not understanding what each function is supposed to do.  valueToSequenceFile and keyToSequenceFile seem to fail if I have a DList of Tuple2's.

This works fine:


import com.nicta.scoobi.Scoobi._
object HelloWorld extends ScoobiApp{
  def run() = {
    val x = DList((1L, 2L), (2L, 3L),(3L, 4L)).toSequenceFile("checkpoint").checkpoint
    val y = x.map(_._2)
    persist(y.toTextFile("second"))
  }
}

And this too:


import com.nicta.scoobi.Scoobi._
object HelloWorld extends ScoobiApp{
  def run() = {

    val x = DList((1L, 2L, 3L), (2L, 3L, 4L),(3L, 4L, 5L)).valueToSequenceFile("checkpoint").checkpoint


    val y = x.map(_._2)
    persist(y.toTextFile("second"))

  }
}



But not this (keyToSequenceFile also fails). 

import com.nicta.scoobi.Scoobi._
object HelloWorld extends ScoobiApp{
  def run() = {
    val x = DList((1L, 2L), (2L, 3L),(3L, 4L)).valueToSequenceFile("checkpoint").checkpoint

    val y = x.map(_._2)
    persist(y.toTextFile("second"))
  }
}

This is the error:

[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.NullWritable>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.NullWritable
    at com.nicta.scoobi.io.sequence.SequenceOutput$$anon$1.fromKeyValue(SequenceOutput.scala:52)
    at com.nicta.scoobi.core.InputConverter$class.asValue(InputOutputConverter.scala:27)
    at com.nicta.scoobi.io.sequence.SequenceOutput$$anon$1.asValue(SequenceOutput.scala:52)

    at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:177)
    at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)



Eric Torreborre

unread,
Apr 24, 2013, 8:00:28 PM4/24/13
to scoob...@googlegroups.com
Hi Elliot,

I fixed those 2 issues. I never realised that those 2 methods didn't even have a test for them :-(.

Thanks,

Eric.
Message has been deleted

Elliot Chow

unread,
Apr 27, 2013, 10:36:20 PM4/27/13
to scoob...@googlegroups.com
Hi Eric,

Thanks for fixing those.  There's just one thing that was a bit unintuitive for me.  When I use valueToSequenceFile on a DList[A], it works as I expected and just persists the A's.  However, if I use, valueToSequenceFile on DList[(K,V)], only the V's get persisted.  I got stuck for a while not realizing this was happening.  I had started out checkpointing data that was not a Tuple2 and everything was fine.  Then, I decided to change the representation of the data to a Tuple2, not knowing that the checkpointing would have a different meaning and then I got a runtime error.

I can understand why we may want to persist the V's as the final output of a job, but this can be very deceiving for checkpointing as the type of the DList before and after the checkpoint are not required to be the same (i.e.  we checkpoint DList[(K,V)], but actually DList[V] gets persisted and then loaded the next time the job is run even though the type at compile time is still DList[(K,V)]).

Eric Torreborre

unread,
Apr 28, 2013, 7:47:00 AM4/28/13
to scoob...@googlegroups.com
Hi Elliot,

If I understand you correctly, I should find a way to disallow checkpointing in all cases where saved files can't be loaded with same type again. This is already the case for Text files (because we can go from A => String, but we don't know how to go from String => A), but this is also the case for Sequence files where only key or values have been saved.

I'll look in to this tomorrow, there should be a way to make that checkable at runtime.

Cheers,

Eric..

Elliot Chow

unread,
Apr 28, 2013, 1:39:57 PM4/28/13
to scoob...@googlegroups.com
Yes, that sounds good.  Thanks.

Eric Torreborre

unread,
Apr 28, 2013, 10:45:54 PM4/28/13
to scoob...@googlegroups.com
Hi Elliot,

I did some changes and there are collateral damages in the way of API breaks.

Now you need to write: 

    // instead of DList(1, 2, 3).toAvroFile(path = "xxx").checkpoint
    DList(1, 2, 3).toAvroFile(path = "xxx", checkpoint = true) 

If all the tests pass on our cluster in the next 2 hours, there should be some newly published CDH3 jars, I'll let you know when it's done.

Cheers,

Eric.
Reply all
Reply to author
Forward
0 new messages