TextDelimited With Headers, right padded null values

154 views
Skip to first unread message

jd

unread,
Oct 17, 2012, 4:50:17 PM10/17/12
to cascadi...@googlegroups.com
Hello all.
I'm using a TextDelimited file with Headers, and I'm running in to an issue if the right most values are null or empty string.

In this situation Cascading does not attempt to fill in those values in the tuple, and the resulting tuple will throw an exception.
Is there a way to work around this?


cascading.tuple.TupleException: unable to select from: ['a', 'b', 'c'], using selector: ['a', 'b', 'c']
at cascading.tuple.TupleEntry.selectTuple(TupleEntry.java:548)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:53)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:33)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
Caused by: cascading.tuple.TupleException: field declaration: ['a', 'b', 'c'], does not match tuple: []
at cascading.tuple.Tuple.get(Tuple.java:338)
at cascading.tuple.TupleEntry.selectTuple(TupleEntry.java:544)
... 8 more


Message has been deleted

jd

unread,
Oct 17, 2012, 6:48:52 PM10/17/12
to cascadi...@googlegroups.com
Here is a test case.


import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.flow.FlowSession;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.test.HadoopPlatform;
import cascading.test.TestPlatform;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryCollector;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;

import java.io.Serializable;
import java.util.Properties;

/**
 *
 */
public class FooDemo implements Serializable {

    private static TestPlatform platform = new HadoopPlatform();

    @SuppressWarnings ("unchecked")
    @Test
    public void testAlone() throws Exception {

        //

        Tap sinkTap = getTextFileTapWithHeader("./build/test/tmp/sink1", ",");

        //Write out a file with tuples and headers
        Tap srcTap = getTextFileTapWithHeader("./build/test/tmp/src1", new Fields("a", "b", "c"), ",");

        //Read in using the header
        Tap allInTap = getTextFileTapWithHeader("./build/test/tmp/src1", ",");


        initTestData(
                srcTap, new Tuple("val1", "val2", "val3"),
                new Tuple(null, null, null), //this break
                new Tuple("", "", ""),       //this breaks
                new Tuple(null, null, "thisworks")); //this works


        Pipe input = new Pipe("pipe");

        Pipe assembly = new Each( input, new BaseFilter() {
            @Override
            public boolean isRemove (FlowProcess flowProcess, FilterCall filterCall) {
                return(false);
            }
        });




        //Should work
        FlowConnector connector = getFlowConnector();
        Flow flow = connector.connect( srcTap,  sinkTap, assembly );
        flow.start();
        flow.complete();

        connector = getFlowConnector();
        flow = connector.connect( allInTap,  sinkTap, assembly );
        flow.start();
        flow.complete();

    }

    public static Tap getTextFileTapWithHeader ( String fullFilename, String delimiter ) {
        return(platform.getDelimitedFile(Fields.ALL,true,delimiter,null,null,fullFilename, SinkMode.REPLACE ) );
    }
    public static Tap getTextFileTapWithHeader ( String fullFilename, Fields fields, String delimiter ) {
        return(platform.getDelimitedFile(fields,false,true,delimiter,null,null,fullFilename,SinkMode.REPLACE ) );
    }

    public static abstract class BaseFilter<T> extends BaseOperation<T>
            implements Filter<T> {
        private static final long serialVersionUID = 1L;
        protected BaseFilter () {
        }

        protected BaseFilter (Fields fieldDeclaration) {
            super(fieldDeclaration);
        }

        protected BaseFilter (int numArgs) {
            super(numArgs);
        }

        protected BaseFilter (int numArgs, Fields fieldDeclaration) {
            super(numArgs, fieldDeclaration);
        }
    }

    @SuppressWarnings ("unchecked")
    public static void initTestData( Tap tap, Tuple... tuples ) {
        try
        {
            TupleEntryCollector collector = tap.openForWrite(getFlowProcess());
            for(Tuple tuple : tuples) {
                collector.add( tuple );
            }
            collector.close();
        }catch( Exception ex )
        {
            throw new RuntimeException(ex);
        }
    }
    public static FlowProcess getFlowProcess () {
        try{
            //TODO: Cascading is NPE for some reason... temp work around:
            return( platform.getFlowProcess() );
        }catch(NullPointerException npe ) {
            return( new HadoopFlowProcess(new FlowSession(), new JobConf(), true ) );
        }
    }

    public static FlowConnector getFlowConnector(){
        Properties props = new Properties();

        return( platform.getFlowConnector( props ) );

Koert Kuipers

unread,
Oct 17, 2012, 8:05:55 PM10/17/12
to cascadi...@googlegroups.com
i just added a little unit test and i am seeing errors too if i let cascading resolve the headers. once i turn this off and i tell it what the fields are it does interpret the input file correctly.

my test file was
a|b|c
1|2|3
4|5|
7|8|9

it breaks on the line with "4|5" with exception:

    [junit] cascading.pipe.OperatorException: [test][com.tresata.cascading.tap.CsvHfsTest.testTextDelimited(CsvHfsTest.java:158)] operator Each failed executing operation
    [junit]     at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:81)
    [junit]     at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
    [junit]     at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
    [junit]     at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
    [junit]     at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
    [junit]     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
    [junit]     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
    [junit]     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
    [junit] Caused by: java.lang.IndexOutOfBoundsException: Index: 2, Size: 2
    [junit]     at java.util.ArrayList.RangeCheck(ArrayList.java:547)
    [junit]     at java.util.ArrayList.get(ArrayList.java:322)
    [junit]     at cascading.tuple.Tuple.getObject(Tuple.java:221)
    [junit]     at cascading.tuple.util.NarrowTupleList.get(NarrowTupleList.java:72)
    [junit]     at cascading.tuple.Tuple.printTo(Tuple.java:1077)
    [junit]     at cascading.tuple.Tuple.print(Tuple.java:1066)
    [junit]     at cascading.operation.Debug.isRemove(Debug.java:222)
    [junit]     at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:70)
    [junit]     ... 7 more
    [junit] [test] task completion events i

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/ukkrpnNuyU8J.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Oct 17, 2012, 10:37:22 PM10/17/12
to cascadi...@googlegroups.com
good catch! should be a fix in the next 2.1 wip.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/ukkrpnNuyU8J.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
Reply all
Reply to author
Forward
0 new messages