link to pull request (again):
https://github.com/NICTA/scoobi/pull/236In my fork (This hasn't been brought up to the current snapshot, so I believe it should still work).
https://github.com/ellchow/scoobi/tree/anyWFSeqSchemaJust in case you still cannot see my pull request, here are some notes about the errors I am seeing now:
I'm not sure what changed, but this seems to have stopped working since the API for sequence IO was modified.
For this code
import com.nicta.scoobi.Scoobi._
object HelloWorld extends ScoobiApp{
def run() = {
val x = DList((1L, 2L), (2L, 3L),(3L, 4L)).toSequenceFile("checkpoint").checkpoint
val y = x.map(_._2)
persist(y.toTextFile("second"))
}
}
[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(Unknown Source)
at scala.Tuple2._2$mcJ$sp(Tuple2.scala:19)
at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37)
at HelloWorld$$anonfun$2.apply(HelloWorld.scala:37)
at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105)
at com.nicta.scoobi.core.DList$$anonfun$map$1.apply(DList.scala:105)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:79)
at com.nicta.scoobi.core.DoFn$class.process(EnvDoFn.scala:55)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.process(EnvDoFn.scala:77)
at com.nicta.scoobi.core.EnvDoFn$class.processFunction(EnvDoFn.scala:37)
at com.nicta.scoobi.core.BasicDoFn$$anon$1.processFunction(EnvDoFn.scala:77)
at com.nicta.scoobi.impl.plan.comp.ParallelDo.map(ProcessNode.scala:94)
at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36)
at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter$$anonfun$map$1.apply(VectorEmitterWriter.scala:36)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.nicta.scoobi.impl.mapreducer.VectorEmitterWriter.map(VectorEmitterWriter.scala:36)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeMapper$1(InputChannel.scala:194)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:182)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$$anonfun$computeNext$1$1.apply(InputChannel.scala:181)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.computeNext$1(InputChannel.scala:181)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:178)
at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
Also, this fails
import com.nicta.scoobi.Scoobi._
import java.io._
import collection.mutable
import com.nicta.scoobi.core.WireFormat
import org.apache.hadoop.io.BytesWritable
object HelloWorld extends ScoobiApp{
implicit def anyWFSeqSchema[A : WireFormat]: SeqSchema[A] = new SeqSchema[A] {
type SeqType = BytesWritable
val b = mutable.ArrayBuffer[Byte]().mapResult(_.toArray)
def toWritable(a: A) = {
val bs = new ByteArrayOutputStream
implicitly[WireFormat[A]].toWire(a, new DataOutputStream(bs))
new BytesWritable(bs.toByteArray)
}
def fromWritable(xs: BytesWritable): A = {
b.clear()
xs.getBytes.take(xs.getLength).foreach { x => b += x }
val bArr = b.result()
val bais = new ByteArrayInputStream(bArr)
implicitly[WireFormat[A]].fromWire(new DataInputStream(bais))
}
val mf: Manifest[SeqType] = implicitly
}
def run() = {
case class Foo(val value: Int)
implicit val FooFmt = mkCaseWireFormat(Foo, Foo unapply _)
val x = DList(Foo(1), Foo(2)).valueToSequenceFile("checkpoint").checkpoint
val y = x.map(e => Foo(e.value + 1))
persist(y.toTextFile("plusone"))
}
}
[WARN] LocalJobRunner - job_local_0001 <java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
at HelloWorld$$anon$1.fromWritable(HelloWorld.scala:10)
at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:111)
at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.fromKeyValue(SequenceInput.scala:110)
at com.nicta.scoobi.core.InputConverter$class.asValue(DataSource.scala:51)
at com.nicta.scoobi.io.sequence.SequenceInput$$anon$6.asValue(SequenceInput.scala:110)
at com.nicta.scoobi.impl.plan.mscr.MscrInputChannel$class.map(InputChannel.scala:177)
at com.nicta.scoobi.impl.plan.mscr.FloatingInputChannel.map(InputChannel.scala:250)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at com.nicta.scoobi.impl.mapreducer.MscrMapper$$anonfun$map$1.apply(MscrMapper.scala:62)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.nicta.scoobi.impl.mapreducer.MscrMapper.map(MscrMapper.scala:62)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)