How to implement this use case in Scoobi ..

199 views
Skip to first unread message

Debasish Ghosh

unread,
Mar 27, 2013, 6:40:36 AM3/27/13
to scoobi...@googlegroups.com
Hi -

What we do with native map/reduce in Scala is the following ..

  1. We read data from external sources and generate Avro files
  2. We read these Avro files as GenericRecord
  3. We get the schema from the data using data.getSchema where data is of type GenericRecord
  4. We then reflect on the schema using schema.getFields.map { //.. and do stuff in the mapper and get them into Collector for the reduction
What would be the ideal way to do this using Scoobi ? 

Thanks.
- Debasish

Ben Lever

unread,
Mar 27, 2013, 8:17:00 AM3/27/13
to scoobi...@googlegroups.com
Hi Debasish,

I've given this only 5 mins of thought, but off the top of my head I think it would be possible with the following:

 - Implement a com.nicta.scoobi.core.WireFormat[GenericRecord]: this will make it possible to have objects of type DList[WireFormat]
 - Implement a com.nicta.scoobi.io.avro.AvroSchema[GenericRecord]: this will make it possible to use fromAvroFile to read in an Avro file as a DList[GenericRecord], as well as toAvroFile to write a DList[GenericRecord] out as an Avro file

Having said all that, I think that was one of the goals of https://github.com/NICTA/scoobi/pull/150. We'll have to investigate why #150 isn't on master ...

In summary, if we have the above you should be able to achieve your goals pretty simply:

// step 1
fromTextFile("hdfs://foo").map(line => new GenericData.Record(.....)).toAvroFile("hdfs://bar").run()

// steps 2 - 4
val x = fromAvroFile("hdfs://bar") map { (rec: GenericRecord) =>
  val schema = rec.getSchema
  val x = rec.get(0).asInstanceOf[Something]
  val y = rec.get(1).asInstanceOf[SomethingElse]
  ... etc ...
}

Does that make sense?

Cheers,
Ben.

Debasish Ghosh

unread,
Mar 27, 2013, 10:40:11 AM3/27/13
to scoobi...@googlegroups.com
Hi Ben -

Thanks for the response.

I also was thinking along the same line. But the problem is org.apache.avro.generic.GenericRecord is an interface. If I implement the WireFormat typeclass for GenericRecord, instantiation fails in persist since it cannot construct an object of type GenericRecord. Hence to make it work, I need to implement WireFormat[T <: GenericRecord], which means in the client code I need to specify T, which I don't know. e.g using the current version of Scoobi, I can do the following ..

class Gen[T <: SpecificRecordBase : Manifest : AvroSchema] {
  def gen() {
    val w3: DList[T] = fromAvroFile[T]("test-output")
    persist(w3.map(_.toString).toTextFile("test-xxx", overwrite = true))
    persist(w3.map(_.get(1).toString).toTextFile("test-yyy", overwrite = true))
   }
}

and use it as 

val g = new Gen[Weather]
g.gen()

But in my use case I don't know the class Weather.

Am I missing something ?

Thanks.






--
You received this message because you are subscribed to a topic in the Google Groups "scoobi-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/scoobi-users/HDaswXWjiIo/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to scoobi-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Debasish Ghosh
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Christopher Severs

unread,
Mar 27, 2013, 2:27:40 PM3/27/13
to scoobi...@googlegroups.com, dgh...@acm.org
Can you read them up as GenericData.Record and do the reflect from there? It's the most generic concrete implementation of GenericRecord.

More generally, is there a reason you can't have the schema in hand before the job runs? I assume it's something generic where the job should run across all different sources?

-----
Chris

Debasish Ghosh

unread,
Mar 28, 2013, 1:05:09 AM3/28/13
to scoobi...@googlegroups.com, dgh...@acm.org
Thanks .. I will try the GenericData.Record path and let u know. 

Regarding the schema, it's not possible since we need to read arbitrary database tables across multiple sources and then generate avro files from there.

Thanks.

Debasish Ghosh

unread,
Mar 28, 2013, 7:40:06 AM3/28/13
to scoobi...@googlegroups.com, dgh...@acm.org
The problem with GenericData.Record is also that it doesn't have a no-arg constructor, which Scoobi needs.

Thanks.

Christopher Severs

unread,
Mar 28, 2013, 2:58:50 PM3/28/13
to scoobi...@googlegroups.com, dgh...@acm.org
I'm sure I've read things in as GenericData.Record before, let me try and dig up some of my old examples and get back to you.

Debasish Ghosh

unread,
Mar 28, 2013, 2:59:50 PM3/28/13
to scoobi...@googlegroups.com
That indeed would be very helpful .. Thanks ..

Eric Torreborre

unread,
Apr 15, 2013, 1:06:39 AM4/15/13
to scoobi...@googlegroups.com, dgh...@acm.org
Hi all, 

We've created a Github issue to solve this question: https://github.com/NICTA/scoobi/issues/235.

Cheers,

Eric.

Debasish Ghosh

unread,
Apr 15, 2013, 3:31:35 AM4/15/13
to scoobi...@googlegroups.com
Thanks a lot .. 

Eric Torreborre

unread,
Apr 30, 2013, 9:11:51 PM4/30/13
to scoobi...@googlegroups.com, dgh...@acm.org
Hi Debasish,

Please try out the latest SNAPSHOT version, I hope it covers your use case. Basically you can do this:

    // create a list of generic records
    val list: DList[GenericRecord] = DList(1).map { t =>
      val record = new Record(AvroSchema.mkRecordSchema(Seq(implicitly[AvroSchema[Int]])))
      record.put("v0", 1)
      record
    }

    // persist the list
    list.toAvroFile("avro", overwrite = true)

    // load the list as generic records and access fields
    fromAvroFile[GenericRecord]("avro").map { record => 
       record.get(0).asInstanceOf[Int])
    }

It was a bit tricky to get this to work because we have to fetch the schema of the records at run-time, based on the first element read/written, in order to know how to load/persist them properly.

Cheers,

Eric.

Debasish Ghosh

unread,
May 1, 2013, 4:35:53 AM5/1/13
to scoobi...@googlegroups.com
Thanks a lot .. we will give it a spin ..

Vignesh S

unread,
Dec 6, 2013, 12:31:36 PM12/6/13
to scoobi...@googlegroups.com, dgh...@acm.org
Hi

I have a similar use case and I tried the below code

def run() {

    configuration.jobNameIs(this.getClass.getName)

    val guid = new Schema.Field("guid", Schema.create(Schema.Type.STRING), "", JsonNodeFactory.instance.textNode(""))
    val data = new Schema.Field("data", Schema.create(Schema.Type.STRING), "", JsonNodeFactory.instance.textNode(""))
    val schema = Schema.createRecord("sample", "", "", false)
    schema.setFields(List(guid, data))

    val record1 = new GenericData.Record(schema)
    record1.put("guid", "g1")
    record1.put("data", "aaa")

    val record2 = new GenericData.Record(schema)
    record2.put("guid", "g1")
    record2.put("data", "aaa")

    // create a list of generic records
    val list: DList[GenericRecord] = DList(record1, record2)

    val toGroup = list.map {
      r =>
        println(r)
        //(r.get("guid").toString, r)
        r
    }

    toGroup.run
  }

The println statement prints "{"v0": 2}", I not able to retrieve the data from the avro record. Am I missing something, Can you please throw some light?

thanks
Vignesh

Vignesh S

unread,
Dec 6, 2013, 4:56:31 PM12/6/13
to scoobi...@googlegroups.com, dgh...@acm.org
Created a WireFormat with the schema I generate to solve this.

thanks
Vignesh
Reply all
Reply to author
Forward
0 new messages