Guidance on how to construct and write complex Avro objects in Cascading

3,797 views
Skip to first unread message

Michael Peterson

unread,
Apr 29, 2014, 5:03:51 PM4/29/14
to cascadi...@googlegroups.com
Today I started playing with cascading.avro and got the gist of simple use cases, but it's not clear to me from reading the unit tests how to handle my use case.

Currently, this is what I do with JSON formatted data.  I'd like to test doing this with Avro.  Please advise how.

My current flow:

I have a POJO, let's call it "TopPojo" that has 50 fields:
- 44 strings or primitives, like ID, timestamps and lots of other things
- 1 embedded POJO
- 5 arrays of other POJOs

The Cascading flow is to take in data from multiple input Taps, each having the data for the 7 POJOs.  I do various deduplications, joins, filtering and manipulations until I have a single joined record and I then construct "TopPojo" in an aggregator, serialize that to JSON in memory and write that JSON string to the OutputCollector in my Aggregator.  The output written as a TextLine: one TopPojo record in JSON per line.

With Avro, I've created schemas for TopPojo with all its subfields/pojos and used the Avro code generator to generate Avro versions of these POJOs.  But now how do I use that with cascading.avro?

I can't find much documentation on cascading.avro and the few unit tests that do an actual Cascading Flow just let the AvroScheme in the output Tap serialize to Avro format.  I can see how that works for simple data structures, but I don't know how to do that with a heavily nested object like "TopPojo".

Thanks for your help,
Michael



Michael Peterson

unread,
Apr 30, 2014, 11:48:16 AM4/30/14
to cascadi...@googlegroups.com
I've gotten something working, but I don't like it and would still like pointers on improvement.

I've built a highly simplified example, using this Avro schema:

    {
      "namespace": "quux.avro",
      "type": "record",
      "name": "TopPojo",
      "fields": [
        {"name": "Id",         "type": ["string", "null"], "default": null},
        {"name": "State",      "type": ["string", "null"], "default": null},
        {
          "name": "Thing",
          "type": [
            "null",
            {
              "namespace": "quux.avro",
              "type" : "record",
              "name" : "EmbeddedPojo",
              "fields" : [
                {"name": "Age",      "type": ["string", "null"], "default": null},
                {"name": "City",     "type": ["string", "null"], "default": null},
                {"name": "LastName", "type": ["string", "null"], "default": null}
              ]
            }
          ]
        }
      ]
    }

I used the Avro code generator to build code for:

    quux.avro.TopPojo
    quux.avro.EmbeddedPojo

Here's the Cascading code:

    String topPath = "toppojo.input.txt";
    String embPath = "embeddedpojo.input.txt";
    String avroOutPath = "toppojo.avro";
    File encSchemaFile = new File("TopPojo.avsc");
   
    Properties properties = new Properties();
    AppProps.setApplicationJarClass(properties, QuuxAvroCascader.class);
    LocalFlowConnector flowConnector = new LocalFlowConnector(properties);
   
    Schema payerSchema = new Schema.Parser().parse(encSchemaFile);
   
    // 'Key' is the join key between these two inputs
    Fields topFields = new Fields("KeyA", "Id", "State");
    Fields embeddedFields = new Fields("KeyB", "Age", "City", "LastName");
    Fields joinFields = new Fields("Id", "State",  "Thing",  "Age", "City", "LastName");

    Tap<?,?,?> topTap = new FileTap(new TextDelimited(topFields, false, false, ","), topPath);
    Tap<?,?,?> embTap = new FileTap(new TextDelimited(embeddedFields, false, false, ","), embPath);

    Pipe encPipe = new Pipe("top");
    Pipe patPipe = new Pipe("embedded");
   
    Pipe joinPipe = new CoGroup(encPipe, new Fields("KeyA"), patPipe, new Fields("KeyB"));
       
    joinPipe = new Every(joinPipe, Fields.ALL, new TopPojoAggregator(joinFields), Fields.RESULTS);
   
    Tap<?,?,?> avroOutTap =
      new FileTap(new cascading.avro.local.AvroScheme(payerSchema), avroOutPath);
   
    FlowDef flowDef = FlowDef.flowDef().
        setName("transform to Avro").
        addSource(encPipe, topTap).
        addSource(patPipe, embTap).
        addTailSink(joinPipe, avroOutTap);
   
    Flow<?> wcFlow = flowConnector.connect(flowDef);
    wcFlow.complete();



The "complete" method of the TopPojoAggregator has:

    Context ctx = aggregatorCall.getContext();
    TopPojo top = ctx.top;
   
    Fields embFields = new Fields("Age", "City", "LastName");
    Tuple embTuple = Tuple.size(3);
    embTuple.set(0, top.getThing().getAge());
    embTuple.set(1, top.getThing().getCity());
    embTuple.set(2, top.getThing().getLastName());
    TupleEntry embTupleEntry = new TupleEntry(embFields, embTuple);
   
    // why do I have to create a flattened listing with fields from sub objects?
    Fields allFields = new Fields("Id", "State", "Thing", "Age", "City", "LastName");

    Tuple allTuple = Tuple.size(allFields.size());  // have to specify the full size
    allTuple.set(0, top.getId());
    allTuple.set(1, top.getState());
    allTuple.set(2, embTupleEntry);
    // leave fields 3,4,5 "blank", but they have to be in the "allTuple" Tuple => WHY?
   
    TupleEntry allTupleEntry = new TupleEntry(allFields, allTuple);
   
    aggregatorCall.getOutputCollector().add(allTupleEntry);


This "works", it generates this output for a simple data set:

    $ java -jar ~/java/lib/avro-tools-1.7.6.jar tojson toppojo.avro
    {"Id":{"string":"id1"},"State":{"string":"state1"},"Thing":{"quux.avro.EmbeddedPojo":{"Age":{"string":"age1"},"City":{"string":"city1"},"LastName":{"string":"name1"}}}}
    {"Id":{"string":"id2"},"State":{"string":"state2"},"Thing":{"quux.avro.EmbeddedPojo":{"Age":{"string":"age2"},"City":{"string":"city2"},"LastName":{"string":"name2"}}}}



What doesn't make sense to me is that I have to build a Tuple in the Aggregator#complete with all the fields, including those for the embedded pojos - in this case, I have to add the "Thing" field and the fields of "Thing/EmbeddedPojo" ("Age", "City", "LastName") to a single top level Tuple, but I don't have to fill in the Tuple values for the embedded fields.

Hopefully that makes sense.  I've annotated the code above.  I feel that I'm probably doing this wrong.

In reality I'm going to have arrays of "EmbeddedPojos", so I don't think the above model will work.

Christopher Severs

unread,
Apr 30, 2014, 1:45:09 PM4/30/14
to cascadi...@googlegroups.com
Hi Michael,

There is actually a scheme called PackedAvroScheme (https://github.com/ScaleUnlimited/cascading.avro/blob/master/scheme/src/main/java/cascading/avro/PackedAvroScheme.java) which does exactly what you're after. You can create the Avro object yourself and pass them to PackedAvroScheme and it will serialize the objects.

------
Chris

Michael Peterson

unread,
Apr 30, 2014, 2:45:53 PM4/30/14
to cascadi...@googlegroups.com
Hi Chris,

Thanks very much!  That's exactly what I wanted.  This is great, but is pretty much undocumented.  The documentation for avro.cascading needs a lot of love.  Having an example would have saved me a day of experimentation.

For reference, here's what I did that works:

The TopPojoAggregator "complete" method is now just:

    Context ctx = aggregatorCall.getContext();
    TopPojo toppojo = ctx.top;  // This is an Avro generated POJO
    Tuple t = Tuple.size(1);
    t.set(0, toppojo);
    TupleEntry entry = new TupleEntry(new Fields("toppojo"), t);
   
    aggregatorCall.getOutputCollector().add(entry);



And the Cascading code is:

    String topPath = "toppojo.input.txt";
    String embPath = "embeddedpojo.input.txt";
    String avroOutPath = "toppojo.avro";
    File encSchemaFile = new File("TopPojo.avsc");
   
    Properties properties = new Properties();
    AppProps.setApplicationJarClass(properties, QuuxAvroCascader.class);
    LocalFlowConnector flowConnector = new LocalFlowConnector(properties);
   
    Schema payerSchema = new Schema.Parser().parse(encSchemaFile);
   
    // 'Key' is the join key between these two inputs
    Fields topFields = new Fields("KeyA", "Id", "State");
    Fields embeddedFields = new Fields("KeyB", "Age", "City", "LastName");

    Tap<?,?,?> topTap = new FileTap(new TextDelimited(topFields, false, false, ","), topPath);
    Tap<?,?,?> embTap = new FileTap(new TextDelimited(embeddedFields, false, false, ","), embPath);

    Pipe encPipe = new Pipe("top");
    Pipe patPipe = new Pipe("embedded");
   
    Pipe joinPipe = new CoGroup(encPipe, new Fields("KeyA"), patPipe, new Fields("KeyB"));
       
    joinPipe = new Every(joinPipe, Fields.ALL, new TopPojoAggregator(new Fields("toppojo")), Fields.RESULTS);
   
    PackedAvroScheme<TopPojo> packedAvroScheme = new PackedAvroScheme<TopPojo>(payerSchema);
    Tap<?,?,?> avroOutTap = new FileTap(packedAvroScheme, avroOutPath, SinkMode.REPLACE);  
   
    FlowDef flowDef = FlowDef.flowDef().
        setName("transform to Avro").
        addSource(encPipe, topTap).
        addSource(patPipe, embTap).
        addTailSink(joinPipe, avroOutTap);
   
    Flow<?> wcFlow = flowConnector.connect(flowDef);
    wcFlow.complete();



-Michael

Christopher Severs

unread,
Apr 30, 2014, 4:21:25 PM4/30/14
to cascadi...@googlegroups.com
Can you make a pull request with the example?

Michael Peterson

unread,
Jun 13, 2014, 12:30:50 PM6/13/14
to cascadi...@googlegroups.com
Sorry for the delay on this.  I have done a pull request with my PackedAvroSchemeCascadingTest examples:  https://github.com/ScaleUnlimited/cascading.avro/pull/29

-Michael
Reply all
Reply to author
Forward
0 new messages