I've gotten something working, but I don't like it and would still like pointers on improvement.
I've built a highly simplified example, using this Avro schema:
{
"namespace": "quux.avro",
"type": "record",
"name": "TopPojo",
"fields": [
{"name": "Id", "type": ["string", "null"], "default": null},
{"name": "State", "type": ["string", "null"], "default": null},
{
"name": "Thing",
"type": [
"null",
{
"namespace": "quux.avro",
"type" : "record",
"name" : "EmbeddedPojo",
"fields" : [
{"name": "Age", "type": ["string", "null"], "default": null},
{"name": "City", "type": ["string", "null"], "default": null},
{"name": "LastName", "type": ["string", "null"], "default": null}
]
}
]
}
]
}
I used the Avro code generator to build code for:
quux.avro.TopPojo
quux.avro.EmbeddedPojo
Here's the Cascading code:
String topPath = "toppojo.input.txt";
String embPath = "embeddedpojo.input.txt";
String avroOutPath = "toppojo.avro";
File encSchemaFile = new File("TopPojo.avsc");
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, QuuxAvroCascader.class);
LocalFlowConnector flowConnector = new LocalFlowConnector(properties);
Schema payerSchema = new Schema.Parser().parse(encSchemaFile);
// 'Key' is the join key between these two inputs
Fields topFields = new Fields("KeyA", "Id", "State");
Fields embeddedFields = new Fields("KeyB", "Age", "City", "LastName");
Fields joinFields = new Fields("Id", "State", "Thing", "Age", "City", "LastName");
Tap<?,?,?> topTap = new FileTap(new TextDelimited(topFields, false, false, ","), topPath);
Tap<?,?,?> embTap = new FileTap(new TextDelimited(embeddedFields, false, false, ","), embPath);
Pipe encPipe = new Pipe("top");
Pipe patPipe = new Pipe("embedded");
Pipe joinPipe = new CoGroup(encPipe, new Fields("KeyA"), patPipe, new Fields("KeyB"));
joinPipe = new Every(joinPipe, Fields.ALL, new TopPojoAggregator(joinFields), Fields.RESULTS);
Tap<?,?,?> avroOutTap =
new FileTap(new cascading.avro.local.AvroScheme(payerSchema), avroOutPath);
FlowDef flowDef = FlowDef.flowDef().
setName("transform to Avro").
addSource(encPipe, topTap).
addSource(patPipe, embTap).
addTailSink(joinPipe, avroOutTap);
Flow<?> wcFlow = flowConnector.connect(flowDef);
wcFlow.complete();
The "complete" method of the TopPojoAggregator has:
Context ctx = aggregatorCall.getContext();
TopPojo top = ctx.top;
Fields embFields = new Fields("Age", "City", "LastName");
Tuple embTuple = Tuple.size(3);
embTuple.set(0, top.getThing().getAge());
embTuple.set(1, top.getThing().getCity());
embTuple.set(2, top.getThing().getLastName());
TupleEntry embTupleEntry = new TupleEntry(embFields, embTuple);
// why do I have to create a flattened listing with fields from sub objects?
Fields allFields = new Fields("Id", "State", "Thing", "Age", "City", "LastName");
Tuple allTuple = Tuple.size(allFields.size()); // have to specify the full size
allTuple.set(0, top.getId());
allTuple.set(1, top.getState());
allTuple.set(2, embTupleEntry);
// leave fields 3,4,5 "blank", but they have to be in the "allTuple" Tuple => WHY?
TupleEntry allTupleEntry = new TupleEntry(allFields, allTuple);
aggregatorCall.getOutputCollector().add(allTupleEntry);
This "works", it generates this output for a simple data set:
$ java -jar ~/java/lib/avro-tools-1.7.6.jar tojson toppojo.avro
{"Id":{"string":"id1"},"State":{"string":"state1"},"Thing":{"quux.avro.EmbeddedPojo":{"Age":{"string":"age1"},"City":{"string":"city1"},"LastName":{"string":"name1"}}}}
{"Id":{"string":"id2"},"State":{"string":"state2"},"Thing":{"quux.avro.EmbeddedPojo":{"Age":{"string":"age2"},"City":{"string":"city2"},"LastName":{"string":"name2"}}}}
What doesn't make sense to me is that I have to build a Tuple in the Aggregator#complete with all the fields, including those for the embedded pojos - in this case, I have to add the "Thing" field and the fields of "Thing/EmbeddedPojo" ("Age", "City", "LastName") to a single top level Tuple, but I don't have to fill in the Tuple values for the embedded fields.
Hopefully that makes sense. I've annotated the code above. I feel that I'm probably doing this wrong.
In reality I'm going to have arrays of "EmbeddedPojos", so I don't think the above model will work.