Custom Function Null handling

29 views
Skip to first unread message

jnkrish

unread,
Oct 8, 2015, 8:45:01 AM10/8/15
to cascading-user
Hello,

 
 I am getting a null pointer exception when trying a custom function to compare two fields in a tuple and return the required fields. 

Can someone check and  help? New to Java and trying new things at cascading as well.





Fields masterFields = new Fields("id","name","dept","dept_id","state","time").applyTypes(long.class, string.class,String.class,String.class,String.class,Date.class);




Pipe histMasterPipe = new Pipe("histMasterPipe");
Pipe joinPipe = new Pipe ("joinPipe");
Fields common = new Fields( "id" );
Fields declared = new Fields("id1", "name1", "dept1", "dept_id1", "state1", "time1","id2", "name2", "dept2", "dept_id2", "state2", "time2");
joinPipe = new CoGroup(histMasterPipe, common, deltaPipe, common, declared, new LeftJoin() );


// At this point i get below which is expected:
['123', 'Joe', 'IT', '525', 'TX', '20151002 10:10:10', '123', 'Joe', 'Accounts', '545', 'FL', '20151007 20:34:07']
['234', 'Bob', 'HR', '625', 'TX', '20151002 11:10:10', null, null, null, null, null, null]


// Writing a custom function to compare the dept column and decide which record to return
joinPipe = new Each(joinPipe, declared, new ReturnTuple(masterFields), Fields.RESULTS );


public static class ReturnTuple extends BaseOperation implements Function {
   public ReturnTuple() {
       super(12, new Fields("id", "name", "dept", "dept_id", "state","time"));
   }
   public ReturnTuple(Fields fieldDeclaration) {
       super(12, fieldDeclaration);
   }
   public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
       TupleEntry arguments = functionCall.getArguments();
       Tuple result = new Tuple();
        
        long id =0L;
        String name = new String("");
        String dept = new String("");
        String dept_id = new String("");
        String state = new String("");
        String time = new String("");

 if ( !arguments.getString(8).equals(null))         
    { 
      if ( arguments.getString(2).equals(arguments.getString(8)))
      {
      id = arguments.getLong(0);
      name = arguments.getString(1);
      dept = arguments.getString(2);
      dept_id = arguments.getString(3);
      state = arguments.getString(4);
      time = arguments.getString(5);
     
       } else { 
      id = arguments.getLong(6);
      name = arguments.getString(7);
      dept = arguments.getString(8);
      dept_id = arguments.getString(9);
      state = arguments.getString(10);
      time = arguments.getString(11);
                        }
} else  {
       
      id = arguments.getLong(0);
      name = arguments.getString(1);
      dept = arguments.getString(2);
      dept_id = arguments.getString(3);
      state = arguments.getString(4);
      time = arguments.getString(5);
                }
        result.add(id);
    result.add(name);
    result.add(dept);
    result.add(dept_id);
    result.add(state);
    result.add(time);
    
       functionCall.getOutputCollector().add(result);
   }
}


2015-10-07 20:34:08,774 ERROR [pool-3-thread-3] stream.TrapHandler (TrapHandler.java:handleReThrowableException(103)) - caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [histMasterPipe*deltaPipe][devtraining.Main.main(Main.java:712)] operator Each failed executing operation
        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
        at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:73)
        at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
        at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
        at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
        at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
        at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
        at cascading.flow.stream.Fork.complete(Fork.java:60)
        at cascading.flow.stream.Duct.complete(Duct.java:81)
        at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
        at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
        at devtraining.Main$ReturnTuple.operate(Main.java:770)
        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
        ... 17 more
2015-10-07 20:34:08,834 ERROR [pool-3-thread-3] stream.SourceStage (SourceStage.java:map(110)) - caught throwable
cascading.pipe.OperatorException: [histMasterPipe*deltaPipe][devtraining.Main.main(Main.java:712)] operator Each failed executing operation
        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
        at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:73)
        at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
        at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
        at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
        at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
        at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
        at cascading.flow.stream.Fork.complete(Fork.java:60)
        at cascading.flow.stream.Duct.complete(Duct.java:81)
        at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
        at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
        at devtraining.Main$ReturnTuple.operate(Main.java:770)
        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
        ... 17 more
tuples count: 2
tuples count: 3
tuples count: 0
2015-10-07 20:34:09,129 INFO  [flow MasterRefresh] flow.Flow (BaseFlow.java:logInfo(1354)) - [MasterRefresh] stopping all jobs
2015-10-07 20:34:09,131 INFO  [flow MasterRefresh] flow.FlowStep (BaseFlowStep.java:logInfo(834)) - [MasterRefresh] stopping: local
2015-10-07 20:34:09,134 INFO  [flow MasterRefresh] flow.Flow (BaseFlow.java:logInfo(1354)) - [MasterRefresh] stopped all jobs
Exception in thread "main" cascading.flow.FlowException: local step failed
        at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:219)

Ken Krugler

unread,
Oct 8, 2015, 8:51:09 AM10/8/15
to cascadi...@googlegroups.com


From: jnkrish

Sent: October 8, 2015 5:45:00am PDT

To: cascading-user

Subject: Custom Function Null handling



String field8 = arguments.getString(8);
if (field8 != null) {
…

    { 
      if ( arguments.getString(2).equals(arguments.getString(8)))

and if the string in position 2 could be null, use field8 (which at this point you know isn't null)

if (field8.equals(arguments.getString(2)) {
…

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/b33d9fbf-89a6-4e03-af85-07d8130816ba%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Jayaram Nagarajan

unread,
Oct 8, 2015, 8:00:36 PM10/8/15
to cascadi...@googlegroups.com


Sent from my iPhone
You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/Y1KQ7VO2ALE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

jnkrish

unread,
Oct 8, 2015, 9:58:05 PM10/8/15
to cascading-user

Ken,

  Thanks a lot again :) Thanks for your patience and time . It worked  well.


To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--
You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/Y1KQ7VO2ALE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages