I am getting a null pointer exception when trying a custom function to compare two fields in a tuple and return the required fields.Â
Can someone check and  help? New to Java and trying new things at cascading as well.
Fields masterFields = new Fields("id","name","dept","dept_id","state","time").applyTypes(long.class, string.class,String.class,String.class,String.class,Date.class);
Pipe histMasterPipe = new Pipe("histMasterPipe");
Pipe joinPipe = new Pipe ("joinPipe");
Fields common = new Fields( "id" );
Fields declared = new Fields("id1", "name1", "dept1", "dept_id1", "state1", "time1","id2", "name2", "dept2", "dept_id2", "state2", "time2");
joinPipe = new CoGroup(histMasterPipe, common, deltaPipe, common, declared, new LeftJoin() );
// At this point i get below which is expected:
['123', 'Joe', 'IT', '525', 'TX', '20151002 10:10:10', '123', 'Joe', 'Accounts', '545', 'FL', '20151007 20:34:07']
['234', 'Bob', 'HR', '625', 'TX', '20151002 11:10:10', null, null, null, null, null, null]
// Writing a custom function to compare the dept column and decide which record to return
joinPipe = new Each(joinPipe, declared, new ReturnTuple(masterFields), Fields.RESULTS );
public static class ReturnTuple extends BaseOperation implements Function {
  public ReturnTuple() {
    super(12, new Fields("id", "name", "dept", "dept_id", "state","time"));
  }
  public ReturnTuple(Fields fieldDeclaration) {
    super(12, fieldDeclaration);
  }
  public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
    TupleEntry arguments = functionCall.getArguments();
    Tuple result = new Tuple();
    Â
    long id =0L;
    String name = new String("");
    String dept = new String("");
    String dept_id = new String("");
    String state = new String("");
    String time = new String("");
 if ( !arguments.getString(8).equals(null))     Â
  {Â
   if ( arguments.getString(2).equals(arguments.getString(8)))
   {
   id = arguments.getLong(0);
   name = arguments.getString(1);
   dept = arguments.getString(2);
   dept_id = arguments.getString(3);
   state = arguments.getString(4);
   time = arguments.getString(5);
  Â
    } else {Â
   id = arguments.getLong(6);
   name = arguments.getString(7);
   dept = arguments.getString(8);
   dept_id = arguments.getString(9);
   state = arguments.getString(10);
   time = arguments.getString(11);
            }
} else  {
   Â
   id = arguments.getLong(0);
   name = arguments.getString(1);
   dept = arguments.getString(2);
   dept_id = arguments.getString(3);
   state = arguments.getString(4);
   time = arguments.getString(5);
        }
    result.add(id);
  result.add(name);
  result.add(dept);
  result.add(dept_id);
  result.add(state);
  result.add(time);
  Â
    functionCall.getOutputCollector().add(result);
  }
}
2015-10-07 20:34:08,774 ERROR [pool-3-thread-3] stream.TrapHandler (TrapHandler.java:handleReThrowableException(103)) - caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [histMasterPipe*deltaPipe][devtraining.Main.main(Main.java:712)] operator Each failed executing operation
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:73)
    at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
    at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
    at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
    at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
    at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
    at cascading.flow.stream.Fork.complete(Fork.java:60)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at devtraining.Main$ReturnTuple.operate(Main.java:770)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 17 more
2015-10-07 20:34:08,834 ERROR [pool-3-thread-3] stream.SourceStage (SourceStage.java:map(110)) - caught throwable
cascading.pipe.OperatorException: [histMasterPipe*deltaPipe][devtraining.Main.main(Main.java:712)] operator Each failed executing operation
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:73)
    at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
    at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
    at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
    at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
    at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
    at cascading.flow.stream.Fork.complete(Fork.java:60)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
    at devtraining.Main$ReturnTuple.operate(Main.java:770)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 17 more
tuples count: 2
tuples count: 3
tuples count: 0
2015-10-07 20:34:09,129 INFO Â [flow MasterRefresh] flow.Flow (BaseFlow.java:logInfo(1354)) - [MasterRefresh] stopping all jobs
2015-10-07 20:34:09,131 INFO Â [flow MasterRefresh] flow.FlowStep (BaseFlowStep.java:logInfo(834)) - [MasterRefresh] stopping: local
2015-10-07 20:34:09,134 INFO Â [flow MasterRefresh] flow.Flow (BaseFlow.java:logInfo(1354)) - [MasterRefresh] stopped all jobs
Exception in thread "main" cascading.flow.FlowException: local step failed
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:219)