I am getting a null pointer exception when trying a custom function to compare two fields in a tuple and return the required fields.
Can someone check and help? New to Java and trying new things at cascading as well.
Fields masterFields = new Fields("id","name","dept","dept_id","state","time").applyTypes(long.class, string.class,String.class,String.class,String.class,Date.class);
Pipe histMasterPipe = new Pipe("histMasterPipe");
Pipe joinPipe = new Pipe ("joinPipe");
Fields common = new Fields( "id" );
Fields declared = new Fields("id1", "name1", "dept1", "dept_id1", "state1", "time1","id2", "name2", "dept2", "dept_id2", "state2", "time2");
joinPipe = new CoGroup(histMasterPipe, common, deltaPipe, common, declared, new LeftJoin() );
// At this point i get below which is expected:
['123', 'Joe', 'IT', '525', 'TX', '20151002 10:10:10', '123', 'Joe', 'Accounts', '545', 'FL', '20151007 20:34:07']
['234', 'Bob', 'HR', '625', 'TX', '20151002 11:10:10', null, null, null, null, null, null]
// Writing a custom function to compare the dept column and decide which record to return
joinPipe = new Each(joinPipe, declared, new ReturnTuple(masterFields), Fields.RESULTS );
public static class ReturnTuple extends BaseOperation implements Function {
public ReturnTuple() {
super(12, new Fields("id", "name", "dept", "dept_id", "state","time"));
}
public ReturnTuple(Fields fieldDeclaration) {
super(12, fieldDeclaration);
}
public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
TupleEntry arguments = functionCall.getArguments();
Tuple result = new Tuple();
long id =0L;
String name = new String("");
String dept = new String("");
String dept_id = new String("");
String state = new String("");
String time = new String("");
if ( !arguments.getString(8).equals(null))
{
if ( arguments.getString(2).equals(arguments.getString(8)))
{
id = arguments.getLong(0);
name = arguments.getString(1);
dept = arguments.getString(2);
dept_id = arguments.getString(3);
state = arguments.getString(4);
time = arguments.getString(5);
} else {
id = arguments.getLong(6);
name = arguments.getString(7);
dept = arguments.getString(8);
dept_id = arguments.getString(9);
state = arguments.getString(10);
time = arguments.getString(11);
}
} else {
id = arguments.getLong(0);
name = arguments.getString(1);
dept = arguments.getString(2);
dept_id = arguments.getString(3);
state = arguments.getString(4);
time = arguments.getString(5);
}
result.add(id);
result.add(name);
result.add(dept);
result.add(dept_id);
result.add(state);
result.add(time);
functionCall.getOutputCollector().add(result);
}
}
2015-10-07 20:34:08,774 ERROR [pool-3-thread-3] stream.TrapHandler (TrapHandler.java:handleReThrowableException(103)) - caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [histMasterPipe*deltaPipe][devtraining.Main.main(Main.java:712)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:73)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
at cascading.flow.stream.Fork.complete(Fork.java:60)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
at devtraining.Main$ReturnTuple.operate(Main.java:770)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 17 more
2015-10-07 20:34:08,834 ERROR [pool-3-thread-3] stream.SourceStage (SourceStage.java:map(110)) - caught throwable
cascading.pipe.OperatorException: [histMasterPipe*deltaPipe][devtraining.Main.main(Main.java:712)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:73)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
at cascading.flow.stream.Fork.complete(Fork.java:60)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
at devtraining.Main$ReturnTuple.operate(Main.java:770)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 17 more
tuples count: 2
tuples count: 3
tuples count: 0
2015-10-07 20:34:09,129 INFO [flow MasterRefresh] flow.Flow (BaseFlow.java:logInfo(1354)) - [MasterRefresh] stopping all jobs
2015-10-07 20:34:09,131 INFO [flow MasterRefresh] flow.FlowStep (BaseFlowStep.java:logInfo(834)) - [MasterRefresh] stopping: local
2015-10-07 20:34:09,134 INFO [flow MasterRefresh] flow.Flow (BaseFlow.java:logInfo(1354)) - [MasterRefresh] stopped all jobs
Exception in thread "main" cascading.flow.FlowException: local step failed
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:219)