Hi Lakshmi,
You've got a field in the Tuple called "address", which is itself a record containing sub-fields named "street", etc - correct?
Unfortunately I don't believe there's support in Cascading for resolving fields that are inside of a nested record.
The easiest work-around I know of is to use a function to "flatten" the record. Since the nested record is written as a TupleEntry, you could do something like…
private static class FlattenAvro extends BaseOperation<Void> implements Function<Void> {
private Comparable _parentFieldname;
private Fields _subFields;
private transient TupleEntry _result;
public FlattenAvro(Fields parentField, Fields subFields) {
super(1, makeSubfields(parentField, subFields));
_parentFieldname = parentField.get(0);
_subFields = subFields;
}
private static Fields makeSubfields(Fields parentField, Fields subFields) {
if (!parentField.isDefined() || (parentField.size() != 1)) {
throw new IllegalArgumentException("Parent field must be a single defined field");
}
if (!subFields.isDefined() || (subFields.size() == 0)) {
throw new IllegalArgumentException("subFields must be one or more defined fields");
}
String parentFieldname = parentField.get(0).toString();
String[] fieldnames = new String[subFields.size()];
for (int i = 0; i < subFields.size(); i++) {
fieldnames[i] = String.format("%s.%s", parentFieldname, subFields.get(i).toString());
}
return new Fields(fieldnames);
}
@Override
public void prepare(FlowProcess flowProcess, OperationCall<Void> operationCall) {
super.prepare(flowProcess, operationCall);
_result = new TupleEntry(getFieldDeclaration(), Tuple.size(getFieldDeclaration().size()));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall<Void> functionCall) {
// We get called with one field, and that value should be a TupleEntry
TupleEntry te = functionCall.getArguments();
TupleEntry nested = (TupleEntry)te.getObject(_parentFieldname);
if (!nested.getFields().equals(_subFields)) {
throw new IllegalArgumentException("Fields in record don't match fields used to define function output");
}
for (Comparable subFieldname : _subFields) {
Object value = nested.getObject(subFieldname);
String fieldname = String.format("%s.%s", _parentFieldname, subFieldname.toString());
_result.setObject(fieldname, value);
}
functionCall.getOutputCollector().add(_result);
}
}
and then you'd use it like:
Fields addressField = new Fields("address");
p = new Each(p, addressField, new FlattenAvro(addressField, new Fields("street", "city") ), Fields.SWAP);
It's kind of kludgy that you have to pass the target field (addressField above) to the Each() as the operator selector, and to FlattenAvro()'s constructor, but the function needs to call the super with a Fields() containing the resulting flattened field names (e.g. "address.street", "address.city").
-- Ken
PS - here's the test code, which relies on some classes from cascading.utils…
@Test
public void testFlattenAvro() throws Exception {
BasePlatform platform = new LocalPlatform(FlattenAvroTest.class);
Tap tap = new InMemoryTap(new Fields("ID", "address"));
TupleEntryCollector writer = tap.openForWrite(platform.makeFlowProcess());
writer.add(new Tuple(1, new TupleEntry(new Fields("street", "city"), new Tuple("123 Main", "Anytown"))));
writer.add(new Tuple(2, new TupleEntry(new Fields("street", "city"), new Tuple("456 Elm", "Midtown"))));
writer.close();
Pipe p = new Pipe("pipe");
p = new Each(p, new Debug("structured", true));
Fields addressField = new Fields("address");
p = new Each(p, addressField, new FlattenAvro(addressField, new Fields("street", "city") ), Fields.SWAP);
p = new Each(p, new Debug("flattened", true));
Tap sinkTap = new com.scaleunlimited.cascading.NullSinkTap();
FlowDef flowdef = new FlowDef()
.addSource(p, tap)
.addTailSink(p, sinkTap);
platform.makeFlowConnector().connect(flowdef).complete();
}
and the flattened output looks like:
flattened: ['ID', 'address.street', 'address.city']
flattened: ['1', '123 Main', 'Anytown']
flattened: ['2', '456 Elm', 'Midtown']
From: Lakshmi reddy
Sent: August 28, 2015 6:09:46am PDT
To: cascading-user
Subject: Re: Splitting an Avro file based on Record type using cascading