Avro GenericRecordBuilder weirdness

410 views
Skip to first unread message

James Boorn

unread,
Feb 12, 2014, 8:12:53 PM2/12/14
to cascadi...@googlegroups.com
Here is a strange one.  It is a modified version of the WordCount example.  The modification being that the sink is an PackedAvroSource with a GenericRecord that is built with a GenericRecordBuilder.

The code works fine in local mode, but fails when run on our Cloudera 4.4 cluster.  But I found a funky work around that makes this example work, which may demonstrate a bug in scalding.

Also note I am using this to run the jobs:

val wordCountSchema = """{
  "type" : "record",
  "name" : "WordCount",
  "namespace" : "com.example",
  "fields" : [ 
  {
    "name" : "word",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "count",
    "type" : [ "null", "long" ],
    "default" : null
  }]
}"""

  val mode = new Local(strictSources = true)
  val inputURL = getClass.getResource("/transDetail.avsc")
  val arguments = Mode.putMode(mode, Args(""))
  val dest = demoTmpDir + "/wordcount.avro"
  val job: WCtoGRJob =
    new WCtoGRJob(arguments, inputURL.getPath, dest, new Schema.Parser().parse(wordCountSchema))
  val flow = job.buildFlow
  flow.writeDOT(demoTmpDir + "/WordCountFlow.dot")
  flow.complete()
}

Replace val mode = new Local(strictSources = true)  with val mode = new Hdfs(true, conf ) for running on Hadoop.

Job Code that works in local mode, but not Hadoop:

import com.twitter.scalding._
import com.twitter.scalding.avro._
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.Schema
import org.apache.avro.util.{ Utf8 => AvroUtf8 }


class WCtoGRJob(args: Args, inputPath: String, outputPath: String, outputSchema: Schema) extends Job(args) {
  val outputS = Symbol(outputSchema.getName)

  trait OutputGenericRecord extends GenericRecord

  implicit def OutputGenericRecordSchema = new AvroSchemaType[OutputGenericRecord] {
    def schema = outputSchema
  }

  TextLine(inputPath)
    .flatMap('line -> 'word) { line: String => tokenize(line) }
    .groupBy('word) { _.size('count) }
    .map(('word, 'count) -> outputS) { input: (String, Long) =>
      println(s"input 1 is a ${input._1.getClass().getName()}, '${input._1}'")
      println(s"input 2 is a ${input._2.getClass().getName()}, '${input._2}'")
      println(s"schema is $outputSchema")
      println(s"OutputGenericRecordSchema.schema is ${OutputGenericRecordSchema.schema}")
      val builder = new GenericRecordBuilder(outputSchema)
      builder.set("word", new AvroUtf8(input._1))
      builder.set("count", new java.lang.Long(input._2))
      builder.build
    }
    .project(outputS)
    .write(PackedAvroSource[OutputGenericRecord](outputPath))

  // Split a piece of text into individual words.
  def tokenize(text: String): Array[String] = {
    // Lowercase each word and remove punctuation.
    text.toLowerCase.replaceAll("[^a-zA-Z\\s]", "").split("\\s+").filter(_.length > 0)
  }
}

Below is out put from the failing Hadoop.  I can get it to work on Hadoop if I rename the constructor arg outputSchema to outputSchemaBase and this code to the class:
   val outputSchemaStr = outputSchemaBase.toString
   def outputSchema = new Schema.Parser().parse(outputSchemaStr)


On Hadoop I get this output in logs for non-working version:
stdout logs
input 1 is a java.lang.String, 'a'
input 2 is a long, '2'
schema is {"type":"record","name":"WordCount","namespace":"com.example","fields":[{"name":"word","type":["null","string"],"default":null},{"name":"count","type":["null","long"],"default":null}]}
OutputGenericRecordSchema.schema is {"type":"record","name":"WordCount","namespace":"com.example","fields":[{"name":"word","type":["null","string"],"default":null},{"name":"count","type":["null","long"],"default":null}]}


syslog logs
....
2014-02-12 17:02:03,141 INFO cascading.flow.hadoop.FlowReducer: sinking to: Hfs["AvroScheme{schema={"type":"record","name":"WordCount","namespace":"com.example","fields":[{"name":"word","type":["null","string"],"default":null},{"name":"count","type":["null","long"],"default":null}]}}"]["/tmp/rc.pivotlinkqa.com/scaldingExample/3/out"]
2014-02-12 17:02:03,314 ERROR cascading.flow.stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: unable to sink into output identifier: /tmp/rc.pivotlinkqa.com/scaldingExample/3/out
	at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)
	at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
	at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
	at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:107)
	at cascading.operation.Identity$2.operate(Identity.java:137)
	at cascading.operation.Identity.operate(Identity.java:150)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:107)
	at com.twitter.scalding.MapFunction.operate(Operations.scala:52)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
	at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:119)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:107)
	at com.twitter.scalding.MRMAggregator.complete(Operations.scala:305)
	at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
	at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
	at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
	at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
	at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
	at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:506)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:447)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
	at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 2
	at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:263)
	at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:122)
	at org.apache.avro.mapred.AvroOutputFormat$1.write(AvroOutputFormat.java:119)
	at org.apache.hadoop.mapred.ReduceTask$3.collect(ReduceTask.java:483)
	at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
	at cascading.avro.PackedAvroScheme.sink(PackedAvroScheme.java:55)
	at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
	... 36 more
Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 2
	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:561)
	at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
	at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:257)
	... 42 more




Reply all
Reply to author
Forward
0 new messages