Write to Avro format in Scalding

455 views
Skip to first unread message

Chu-Cheng Hsieh

unread,
Jul 28, 2014, 2:01:08 PM7/28/14
to cascadi...@googlegroups.com
Hello,

I'm trying to figure out how to use Avro in Scalding but I'm hitting a block for hours.


It's a simple word count example, starting from reading a TextFile 

  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("input")))

  val records: UnsortedGrouped[String, Int] = lines
    .flatMap { line => line.split("""\s+""") }
    .map{v => (v, 1)}
    .group
    .sum

Then, I'm trying to output it, the first attempt was a SUCCESS (after googling quite a while)

  val schemaTxt =
    """
      {
          "namespace": "com.foo",
          "type": "record",
          "name": "WordCount",
          "fields": [
              {"name": "word", "type": "string"},
              {"name": "size", "type": "int"}
          ]
      }
    """.stripMargin
  implicit def avroSchema = new Schema.Parser().parse(schemaTxt)

  val attempt1 = records.toTypedPipe
      .toPipe[(String, Int)]('word, 'size)    // Covert to Field api
      .write(UnpackedAvroSource(args("output"), avroSchema))  


I don't like the solution because

(1) it requires a conversion to FieldType Api before output
(2) When I try to use "PackedAvroSource, I got an error"

I also check some example [HERE], but still no lock.



The first issue (Trying to output to a PackedAvroSource), 

  val attempt2 = records.toTypedPipe
      .toPipe[(String, Int)]('word, 'size)    // Covert to Field api
      .write(PackedAvroSource(args("output"), avroSchema))  

I got an error like

[ERROR] /Users/chsieh/gitebay/NewbieScalding/src/main/scala/wordcount.scala:62: error: overloaded method value apply with alternatives:

[ERROR]   [T](paths: Seq[String])(implicit mf: Manifest[T], implicit conv: com.twitter.scalding.TupleConverter[T], implicit tset: com.twitter.scalding.TupleSetter[T], implicit avroType: com.twitter.scalding.avro.AvroSchemaType[T])com.twitter.scalding.avro.PackedAvroSource[T] <and>

[ERROR]   [T](path: String)(implicit evidence$1: com.twitter.scalding.avro.AvroSchemaType[T], implicit evidence$2: Manifest[T], implicit evidence$3: com.twitter.scalding.TupleConverter[T])com.twitter.scalding.avro.PackedAvroSource[T]

[ERROR]  cannot be applied to (String, org.apache.avro.Schema)

[ERROR]   o.write(PackedAvroSource(args("output"), avroSchema))

[ERROR]           ^

[ERROR] one error found




The second issue (skipping conversion to field api) also bring me an error:
  val attempt3 = records.toTypedPipe
      //.toPipe[(String, Int)]('word, 'size)    
      .map{p => WordCount(p._1, p._2)}
      .write(UnpackedAvroSource(args("output"), avroSchema))  

Caused by: java.lang.AssertionError: assertion failed: Arity of (class com.twitter.scalding.LowPriorityTupleSetters$$anon$1) is 1, which doesn't match: + ('word', 'size')

at scala.Predef$.assert(Predef.scala:179)

at com.twitter.scalding.TupleArity$class.assertArityMatches(TupleArity.scala:42)

at com.twitter.scalding.LowPriorityTupleSetters$$anon$1.assertArityMatches(TupleSetter.scala:37)

at com.twitter.scalding.RichPipe.flatMapTo(RichPipe.scala:430)

at com.twitter.scalding.typed.TypedPipeInst.toPipe(TypedPipe.scala:585)

at com.twitter.scalding.typed.TypedPipe$class.write(TypedPipe.scala:306)

at com.twitter.scalding.typed.TypedPipeInst.write(TypedPipe.scala:512)

at WordCountJob.<init>(wordcount.scala:62)

... 16 more



Any help is highly appreciated.

Chucheng

Christopher Severs

unread,
Jul 28, 2014, 2:26:45 PM7/28/14
to cascadi...@googlegroups.com
Hi Chu-Cheng,

Are you using the newer scalding-avro module which is part of the main scalding project, or the older scalding.avro one? I apologize for the naming convention and confusion about that ahead of time.

The easiest way to write out an Avro object is to use the package method:

def writePackedAvro[T](pipe: TypedPipe[T], path: String)(implicit mf: Manifest[T],
   st: AvroSchemaType[T],
   conv: TupleConverter[T],
   set: TupleSetter[T],
   flow: FlowDef,
   mode: Mode): Unit = {
   val sink = PackedAvroSource[T](path)
   pipe.write(sink)
 }

You can see from what the method does that if you want to write out an Avro object you need to provide a type to the PackedAvroSource, not a schema.

There is a similar method,
 def writeUnpackedAvro[T <: Product](pipe: TypedPipe[T], path: String, schema: Schema)

which does the same for an incoming tuple given a schema. Ideally I would like to have a version that doesn't require a schema but I haven't gotten around to that yet.

Does this help? If so I'll try and update the wiki with this info.

Thanks,
Chris

Chu-Cheng Hsieh

unread,
Jul 28, 2014, 2:48:01 PM7/28/14
to cascadi...@googlegroups.com
Thanks so much Chris,

I was truing the old scalding.avro, but after reading your response, I tried the scalding-avro one.


  case class WordCount(word: String, size: Int)  

  val lines : TypedPipe[String] = TypedPipe.from(TextLine(args("input")))

  val records: UnsortedGrouped[String, Int] = lines
    .flatMap { line => line.split("""\s+""") }
    .map{v => (v, 1)}
    .group
    .sum

  val o: TypedPipe[WordCount] = records.toTypedPipe
    .map{p => WordCount(p._1, p._2)}

  avro.writePackedAvro[WordCount](o, args("output)"))

I then got an error says:

[ERROR] /Users/chsieh/gitebay/NewbieScalding/src/main/scala/wordcount.scala:60: error: could not find implicit value for parameter st: com.twitter.scalding.avro.AvroSchemaType[WordCount]

[ERROR]   avro.writePackedAvro[WordCount](o, args("output)"))


Any idea?

Christopher Severs

unread,
Jul 28, 2014, 2:56:25 PM7/28/14
to cascadi...@googlegroups.com
WordCount needs to be an Avro SpecificRecord (or there needs to be an implicit in scope which gives your type this trait:
trait AvroSchemaType[T] extends Serializable {
 def schema: Schema
}

Chu-Cheng Hsieh

unread,
Jul 28, 2014, 5:16:05 PM7/28/14
to cascadi...@googlegroups.com
Thanks so much Chris.  

After generating a WordCount (SpecialRecord) Class using maven-avro-plugin, everything is working fine.

As a follow up question,



I notice that when I switch 
    avro.writePackedAvro[WordCount](data, args("output") + "_packed.avro")
to
    avro.writeUnpackedAvro[WordCount](data, args("output") + "_unpacked.avro")

I'll get an exception:

[ERROR] /Users/chsieh//NewbieScalding/src/main/scala/wordcount.scala:38: error: type arguments [com.fun.WordCount] do not conform to method writeUnpackedAvro's type parameter bounds [T <: Product]

[ERROR]   avro.writeUnpackedAvro[WordCount](data, args("output") + "_unpacked.avro")

What's the different between a packed-avro and a unpacked-avro? Can we exchange them freely?

Christopher Severs

unread,
Jul 28, 2014, 5:58:30 PM7/28/14
to cascadi...@googlegroups.com
UnpackedAvroSource is for when you have a tuple or case class and want to spit out an Avro record. The main problem is that you need to provide a schema in order for this to work. PackedAvroSource is for when you have the Avro record in hand and just want to write it out to disk.

Serega Sheypak

unread,
Jul 29, 2015, 1:17:13 PM7/29/15
to cascading-user, chris....@gmail.com
What is the right way to mock PackedAvroSource as a source in unit test?
I'm using JobTest utility.

понедельник, 28 июля 2014 г., 23:58:30 UTC+2 пользователь Christopher Severs написал:
Reply all
Reply to author
Forward
0 new messages