Splitting an Avro file based on Record type using cascading

335 views
Skip to first unread message

Lakshmi reddy

unread,
Aug 28, 2015, 2:10:31 AM8/28/15
to cascading-user
Hi,

  We would like to split an Avro file which is having all datatype(primitives,Record,Union,Array,Map etc). Each RECORD data will be written into a new file. But while doing we were unable to extract the fields of a inner records.

Please find the code example.

Tap docTap = new FileTap(new AvroScheme(schema), docPath);

   Pipe main = new Pipe("main");

  Pipe empPipe = new Pipe("emp",main);
  Pipe addrPipe = new Pipe("addr",main);

  Fields empFields = new Fields("username","age","phone");
  Fields addrFields = new Fields ("housenum","street","state","country");

 empPipe = new Retain(empPipe, empFields);
 addrPipe = new Retain(addrPipe, addrFields );


But we are getting an error regarding address fields.Please help us.

 

Andre Kelpe

unread,
Aug 28, 2015, 4:14:10 AM8/28/15
to cascading-user
Hi,

I don't know the answer to this, but it is always a good idea to include stacktraces. Otherwise it is very hard for people to help you.

- André

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/f9b02703-5efa-4aba-b573-4f8d076a7c05%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--

Lakshmi reddy

unread,
Aug 28, 2015, 9:09:47 AM8/28/15
to cascading-user
Hi, 
 Please find the stack trace we are getting.


flow.planner.PlannerException: could not build flow from assembly: [[addr][cascading.pipe.assembly.Retain.<init>(Retain.java:48)] unable to resolve argument selector: [{5}:'street', 'city', 'state_prov', 'country', 'zip'], with incoming: [{1}:'address']]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:577)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:108)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:40)
at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
at com.virtusa.casc.CascAvroDiv.main(CascAvroDiv.java:93)
Caused by: cascading.pipe.OperatorException: [addr][cascading.pipe.assembly.Retain.<init>(Retain.java:48)] unable to resolve argument selector: [{5}:'street', 'city', 'state_prov', 'country', 'zip'], with incoming: [{1}:'address']
at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:345)
at cascading.pipe.Each.outgoingScopeFor(Each.java:368)
at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:628)
at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:610)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:95)
... 3 more
Caused by: cascading.tuple.FieldsResolverException: could not select fields: [{1}:'street'], from: [{1}:'address']
at cascading.tuple.Fields.indexOf(Fields.java:1016)
at cascading.tuple.Fields.select(Fields.java:1072)
at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:341)

Ken Krugler

unread,
Aug 28, 2015, 6:05:05 PM8/28/15
to cascadi...@googlegroups.com
Hi Lakshmi,

You've got a field in the Tuple called "address", which is itself a record containing sub-fields named "street", etc - correct?

Unfortunately I don't believe there's support in Cascading for resolving fields that are inside of a nested record.

The easiest work-around I know of is to use a function to "flatten" the record. Since the nested record is written as a TupleEntry, you could do something like…

    private static class FlattenAvro extends BaseOperation<Void> implements Function<Void> {

        private Comparable _parentFieldname;
        private Fields _subFields;

        

        private transient TupleEntry _result;

        

        public FlattenAvro(Fields parentField, Fields subFields) {
            super(1, makeSubfields(parentField, subFields));

            

            _parentFieldname = parentField.get(0);
            _subFields = subFields;
        }

        

        private static Fields makeSubfields(Fields parentField, Fields subFields) {
            if (!parentField.isDefined() || (parentField.size() != 1)) {
                throw new IllegalArgumentException("Parent field must be a single defined field");
            }

            

            if (!subFields.isDefined() || (subFields.size() == 0)) {
                throw new IllegalArgumentException("subFields must be one or more defined fields");
            }

            

            String parentFieldname = parentField.get(0).toString();

            

            String[] fieldnames = new String[subFields.size()];
            for (int i = 0; i < subFields.size(); i++) {
                fieldnames[i] = String.format("%s.%s", parentFieldname, subFields.get(i).toString());
            }

            

            return new Fields(fieldnames);
        }

        @Override
        public void prepare(FlowProcess flowProcess, OperationCall<Void> operationCall) {
            super.prepare(flowProcess, operationCall);

            

            _result = new TupleEntry(getFieldDeclaration(), Tuple.size(getFieldDeclaration().size()));
        }

        

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall<Void> functionCall) {

            

            // We get called with one field, and that value should be a TupleEntry
            TupleEntry te = functionCall.getArguments();
            TupleEntry nested = (TupleEntry)te.getObject(_parentFieldname);

            

            if (!nested.getFields().equals(_subFields)) {
                throw new IllegalArgumentException("Fields in record don't match fields used to define function output");
            }

            

            for (Comparable subFieldname : _subFields) {
                Object value = nested.getObject(subFieldname);
                String fieldname = String.format("%s.%s", _parentFieldname, subFieldname.toString());
                _result.setObject(fieldname, value);
            }

            

            functionCall.getOutputCollector().add(_result);
        }
    }

and then you'd use it like:

        Fields addressField = new Fields("address");
        p = new Each(p, addressField, new FlattenAvro(addressField, new Fields("street", "city") ), Fields.SWAP);

It's kind of kludgy that you have to pass the target field (addressField above) to the Each() as the operator selector, and to FlattenAvro()'s constructor, but the function needs to call the super with a Fields() containing the resulting flattened field names (e.g. "address.street", "address.city").

-- Ken

PS - here's the test code, which relies on some classes from cascading.utils…

    @Test
    public void testFlattenAvro() throws Exception {
        BasePlatform platform = new LocalPlatform(FlattenAvroTest.class);

        Tap tap = new InMemoryTap(new Fields("ID", "address"));
        TupleEntryCollector writer = tap.openForWrite(platform.makeFlowProcess());
        writer.add(new Tuple(1, new TupleEntry(new Fields("street", "city"), new Tuple("123 Main", "Anytown"))));
        writer.add(new Tuple(2, new TupleEntry(new Fields("street", "city"), new Tuple("456 Elm", "Midtown"))));
        writer.close();

        

        Pipe p = new Pipe("pipe");
        p = new Each(p, new Debug("structured", true));

        

        Fields addressField = new Fields("address");
        p = new Each(p, addressField, new FlattenAvro(addressField, new Fields("street", "city") ), Fields.SWAP);
        p = new Each(p, new Debug("flattened", true));

                

        Tap sinkTap = new com.scaleunlimited.cascading.NullSinkTap();
        FlowDef flowdef = new FlowDef()
            .addSource(p, tap)
            .addTailSink(p, sinkTap);

        

        platform.makeFlowConnector().connect(flowdef).complete();

    }

and the flattened output looks like:

flattened: ['ID', 'address.street', 'address.city']
flattened: ['1', '123 Main', 'Anytown']
flattened: ['2', '456 Elm', 'Midtown']


From: Lakshmi reddy

Sent: August 28, 2015 6:09:46am PDT

To: cascading-user

Subject: Re: Splitting an Avro file based on Record type using cascading




--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Lakshmi reddy

unread,
Aug 29, 2015, 5:02:00 AM8/29/15
to cascading-user
Hi kkrugler,

   I am getting ClasscastException in Flatten Avro.

Caused by: java.lang.ClassCastException: cascading.tuple.Tuple cannot be cast to cascading.tuple.TupleEntry
at com.virtusa.casc.FlattenAvro.operate(FlattenAvro.java:61)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)

We would like to write a cascading program which can take any avro file and generate flat files(CSV file) for each record type.Please find the attached code (still in progress) for the same.
CascAvroDiv.java
Reply all
Reply to author
Forward
0 new messages