Here is a test case.
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.flow.FlowSession;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Filter;
import cascading.operation.FilterCall;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.test.HadoopPlatform;
import cascading.test.TestPlatform;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryCollector;
import org.apache.hadoop.mapred.JobConf;
import org.junit.Test;
import java.io.Serializable;
import java.util.Properties;
/**
*
*/
public class FooDemo implements Serializable {
private static TestPlatform platform = new HadoopPlatform();
@SuppressWarnings ("unchecked")
@Test
public void testAlone() throws Exception {
//
Tap sinkTap = getTextFileTapWithHeader("./build/test/tmp/sink1", ",");
//Write out a file with tuples and headers
Tap srcTap = getTextFileTapWithHeader("./build/test/tmp/src1", new Fields("a", "b", "c"), ",");
//Read in using the header
Tap allInTap = getTextFileTapWithHeader("./build/test/tmp/src1", ",");
initTestData(
srcTap, new Tuple("val1", "val2", "val3"),
new Tuple(null, null, null), //this break
new Tuple("", "", ""), //this breaks
new Tuple(null, null, "thisworks")); //this works
Pipe input = new Pipe("pipe");
Pipe assembly = new Each( input, new BaseFilter() {
@Override
public boolean isRemove (FlowProcess flowProcess, FilterCall filterCall) {
return(false);
}
});
//Should work
FlowConnector connector = getFlowConnector();
Flow flow = connector.connect( srcTap, sinkTap, assembly );
flow.start();
flow.complete();
connector = getFlowConnector();
flow = connector.connect( allInTap, sinkTap, assembly );
flow.start();
flow.complete();
}
public static Tap getTextFileTapWithHeader ( String fullFilename, String delimiter ) {
return(platform.getDelimitedFile(Fields.ALL,true,delimiter,null,null,fullFilename, SinkMode.REPLACE ) );
}
public static Tap getTextFileTapWithHeader ( String fullFilename, Fields fields, String delimiter ) {
return(platform.getDelimitedFile(fields,false,true,delimiter,null,null,fullFilename,SinkMode.REPLACE ) );
}
public static abstract class BaseFilter<T> extends BaseOperation<T>
implements Filter<T> {
private static final long serialVersionUID = 1L;
protected BaseFilter () {
}
protected BaseFilter (Fields fieldDeclaration) {
super(fieldDeclaration);
}
protected BaseFilter (int numArgs) {
super(numArgs);
}
protected BaseFilter (int numArgs, Fields fieldDeclaration) {
super(numArgs, fieldDeclaration);
}
}
@SuppressWarnings ("unchecked")
public static void initTestData( Tap tap, Tuple... tuples ) {
try
{
TupleEntryCollector collector = tap.openForWrite(getFlowProcess());
for(Tuple tuple : tuples) {
collector.add( tuple );
}
collector.close();
}catch( Exception ex )
{
throw new RuntimeException(ex);
}
}
public static FlowProcess getFlowProcess () {
try{
//TODO: Cascading is NPE for some reason... temp work around:
return( platform.getFlowProcess() );
}catch(NullPointerException npe ) {
return( new HadoopFlowProcess(new FlowSession(), new JobConf(), true ) );
}
}
public static FlowConnector getFlowConnector(){
Properties props = new Properties();
return( platform.getFlowConnector( props ) );