How to handle exception/error During Execution of Pipe.

25 views
Skip to first unread message

Kunal Ghosh

unread,
Nov 30, 2017, 2:16:40 AM11/30/17
to cascading-user
Hi,
my requirement is to evaluate multiple expression(using ExpressionFuntion) in single Pipe. The issue I am facing is, whenever one of the expression function throws compilation error the whole flow stops, I want to handle that exception and want flow to continue. 
Thanks in advance.

I am using following code with 3 expressions.

                Pipe processPipe_EXP_1509977677872 = new Pipe("recommended_matches");

Fields expField9 = new Fields("expr_0");
ExpressionEvaluator expFun9 = new ExpressionEvaluator(expField9,"src_1509977677867_regionid > 105",expFields_EXP_1509977677872_smallArray,expdataType_EXP_1509977677872,true);
processPipe_EXP_1509977677872 = new Each(processPipe_DIF_1509977677870,expFun9,Fields.ALL );


Fields expField10 = new Fields("expr_1");
ExpressionEvaluator expFun10 = new ExpressionEvaluator(expField10,"src_1509977677869_prodid.equals(\"100860\")",expFields_EXP_1509977677872_smallArray,expdataType_EXP_1509977677872,true);
processPipe_EXP_1509977677872 = new Each(processPipe_EXP_1509977677872,expFun10,Fields.ALL );

     // Following expression is wrong and throws compilation error during flow

Fields expField11 = new Fields("expr_2");
ExpressionEvaluator expFun11 = new ExpressionEvaluator(expField11,"src_1509977677867_regionid >>= 201",expFields_EXP_1509977677872_smallArray,expdataType_EXP_1509977677872,true);
processPipe_EXP_1509977677872 = new Each(processPipe_EXP_1509977677872,expFun11,Fields.ALL );

Chris K Wensel

unread,
Nov 30, 2017, 5:46:10 PM11/30/17
to cascadi...@googlegroups.com
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/eb3adbed-698f-4f77-9122-171e46f149dd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Kunal Ghosh

unread,
Dec 5, 2017, 11:18:40 AM12/5/17
to cascading-user
Thank for reply, but it gives me same error. Please help me....

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
import cascading.operation.expression.ExpressionFunction;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

public class Main {
private static final String DELIMITER = "\t";
    public static void main(String[] args) {
String inputPath1 = args[0];
        String outputPath = args[1];

        // create taps to connect to data
        // sources
        Fields fields = new Fields("prodid","regionid","stdprice","minprice","startdate","enddate");
        Tap inTap = new Hfs(new TextDelimited(fields,true, DELIMITER), inputPath1 , SinkMode.KEEP);
        
        Tap outTap =  new Hfs(new TextDelimited(Fields.ALL,true, DELIMITER), outputPath ,  SinkMode.REPLACE);
        Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), outputPath+"/"+"trap" ); 
        String[] srcFields = {"prodid","regionid","stdprice","minprice","startdate","enddate"};
Class[] srcClassType  = {String.class,Integer.class,String.class,String.class,String.class,String.class};
        
// create pipes to connect to sources
        Pipe inPipe = new Pipe("inPipe");
        Pipe outPutPipe =  new Pipe("recommended_matches",inPipe);

        Fields expField9 = new Fields("expr_0");
ExpressionFunction expFun9 = new ExpressionFunction(expField9,"regionid> 105",srcFields,srcClassType);
outPutPipe = new Each(outPutPipe,expFun9,Fields.ALL );

Fields expField10 = new Fields("expr_1");
ExpressionFunction expFun10 = new ExpressionFunction(expField10,"prodid.equals(\"100860\")",srcFields,srcClassType);
outPutPipe = new Each(outPutPipe,expFun10,Fields.ALL );

Fields expField11 = new Fields("expr_2");
ExpressionFunction expFun11 = new ExpressionFunction(expField11,"regionid >>= 201",srcFields,srcClassType);
outPutPipe = new Each(outPutPipe,expFun11,Fields.ALL );
        

FlowDef flowDef = FlowDef.flowDef().addSource(inPipe,inTap)
.addTailSink(outPutPipe,outTap)
.addTrap(outPutPipe, trapTap);
Properties props = new Properties();
Flow flow = new Hadoop2MR1FlowConnector(props).connect(flowDef);
flow.start(); 
                flow.complete(); 
    }
}


error :

17/12/05 21:40:51 INFO util.Util: resolving application jar from found main method on: Main
17/12/05 21:40:51 INFO planner.HadoopPlanner: using application jar: /home/electric_io/Example/File_sorting/build/libs/File_sorting-1.0.jar
17/12/05 21:40:51 INFO property.AppProps: using app.id: 3CDFF663788141D59AE33B411430822E
17/12/05 21:40:51 INFO flow.Flow: [] executed rule registry: MapReduceHadoopRuleRegistry, completed as: SUCCESS, in: 00:00.063
17/12/05 21:40:51 INFO flow.Flow: [] rule registry: MapReduceHadoopRuleRegistry, supports assembly with steps: 1, nodes: 1
17/12/05 21:40:51 INFO flow.Flow: [] rule registry: MapReduceHadoopRuleRegistry, result was selected using: 'default comparator: selects plan with fewest steps and fewest nodes'
17/12/05 21:40:52 INFO util.Version: Concurrent, Inc - Cascading 3.1.1
17/12/05 21:40:52 INFO flow.Flow: [] starting
17/12/05 21:40:52 INFO flow.Flow: []  source: Hfs["TextDelimited[['prodid', 'regionid', 'stdprice', 'minprice', 'startdate', 'enddate']]"]["/hdfsdata/input/prices.txt"]
17/12/05 21:40:52 INFO flow.Flow: []  sink: Hfs["TextDelimited[['prodid', 'regionid', 'stdprice', 'minprice', 'startdate', 'enddate', 'expr_0', 'expr_1', 'expr_2']]"]["/hdfsdata/output/kunal"]
17/12/05 21:40:52 INFO flow.Flow: []  parallel execution of steps is enabled: true
17/12/05 21:40:52 INFO flow.Flow: []  executing total steps: 1
17/12/05 21:40:52 INFO flow.Flow: []  allocating management threads: 1
17/12/05 21:40:52 INFO flow.Flow: [] starting step: (1/1) /hdfsdata/output/kunal
17/12/05 21:40:52 INFO impl.TimelineClientImpl: Timeline service address: http://slave1.icedq.com:8188/ws/v1/timeline/
17/12/05 21:40:52 INFO client.RMProxy: Connecting to ResourceManager at slave1.icedq.com/192.168.100.133:8050
17/12/05 21:40:52 INFO client.AHSProxy: Connecting to Application History server at slave1.icedq.com/192.168.100.133:10200
17/12/05 21:40:52 INFO impl.TimelineClientImpl: Timeline service address: http://slave1.icedq.com:8188/ws/v1/timeline/
17/12/05 21:40:52 INFO client.RMProxy: Connecting to ResourceManager at slave1.icedq.com/192.168.100.133:8050
17/12/05 21:40:52 INFO client.AHSProxy: Connecting to Application History server at slave1.icedq.com/192.168.100.133:10200
17/12/05 21:40:53 INFO mapred.FileInputFormat: Total input paths to process : 1
17/12/05 21:40:53 INFO mapreduce.JobSubmitter: number of splits:2
17/12/05 21:40:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1512370386208_0090
17/12/05 21:40:54 INFO impl.YarnClientImpl: Submitted application application_1512370386208_0090
17/12/05 21:40:54 INFO mapreduce.Job: The url to track the job: http://slave1.icedq.com:8088/proxy/application_1512370386208_0090/
17/12/05 21:40:54 INFO flow.Flow: [] submitted hadoop job: job_1512370386208_0090
17/12/05 21:40:54 INFO flow.Flow: [] tracking url: http://slave1.icedq.com:8088/proxy/application_1512370386208_0090/
17/12/05 21:41:14 WARN flow.Flow: [] hadoop job job_1512370386208_0090 state at FAILED
17/12/05 21:41:14 WARN flow.Flow: [] failure info: Task failed task_1512370386208_0090_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0

17/12/05 21:41:14 WARN flow.Flow: [] task completion events identify failed tasks
17/12/05 21:41:14 WARN flow.Flow: [] task completion events count: 7
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000000_0, Status : FAILED
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000001_0, Status : FAILED
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000001_1, Status : FAILED
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000000_1, Status : FAILED
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000001_2, Status : FAILED
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000000_2, Status : FAILED
17/12/05 21:41:14 WARN flow.Flow: [] event = Task Id : attempt_1512370386208_0090_m_000001_3, Status : TIPFAILED
17/12/05 21:41:14 INFO flow.Flow: [] stopping all jobs
17/12/05 21:41:14 INFO flow.Flow: [] stopping: (1/1) /hdfsdata/output/kunal
17/12/05 21:41:14 INFO flow.Flow: [] stopped all jobs
17/12/05 21:41:14 INFO util.Hadoop18TapUtil: deleting temp path /hdfsdata/output/kunal/_temporary
17/12/05 21:41:14 INFO flow.Flow: []  completed in: 00:22.338
Exception in thread "main" cascading.flow.FlowException: step failed: (1/1) /hdfsdata/output/kunal, step id: DC042DE290D841FA90FAF75B3C073D4C, job id: job_1512370386208_0090, please see cluster logs for failure messages
        at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:295)
        at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:184)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:146)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:48)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

Showing 4096 bytes of 6719 total. Click here for the full log.

[['prodid', 'regionid', 'stdprice', 'minprice', 'startdate', 'enddate']]"]["/hdfsdata/input/prices.txt"]
2017-12-05 21:41:02,112 INFO [main] cascading.flow.hadoop.FlowMapper: sinking to: Hfs["TextDelimited[['prodid', 'regionid', 'stdprice', 'minprice', 'startdate', 'enddate', 'expr_0', 'expr_1', 'expr_2']]"]["/hdfsdata/output/kunal"]
2017-12-05 21:41:02,112 INFO [main] cascading.flow.hadoop.FlowMapper: trapping to: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["/hdfsdata/output/kunal/trap"]
2017-12-05 21:41:02,113 INFO [main] cascading.flow.hadoop.FlowMapper: flow node id: B6C69F421B2B4AD9AC8D52909F8D3555, mem on start (mb), free: 73, total: 162, max: 1183
2017-12-05 21:41:02,328 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.codehaus.janino.JaninoRuntimeException: SNO: ">>=" reconversion failed
	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3526)
	at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:183)
	at org.codehaus.janino.UnitCompiler$10.visitAssignment(UnitCompiler.java:3243)
	at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3278)
	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4345)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1901)
	at org.codehaus.janino.UnitCompiler.access$2100(UnitCompiler.java:183)
	at org.codehaus.janino.UnitCompiler$4.visitReturnStatement(UnitCompiler.java:944)
	at org.codehaus.janino.Java$ReturnStatement.accept(Java.java:2544)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:956)
	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:997)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2283)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:820)
	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:792)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:505)
	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:391)
	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:183)
	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:345)
	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:352)
	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:320)
	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
	at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
	at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:573)
	at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:471)
	at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:383)
	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
	at org.codehaus.janino.ExpressionEvaluator.<init>(ExpressionEvaluator.java:118)
	at cascading.operation.expression.ExpressionOperation.getEvaluator(ExpressionOperation.java:94)
	at cascading.operation.expression.ScriptOperation.prepare(ScriptOperation.java:288)
	at cascading.flow.stream.element.OperatorStage.prepare(OperatorStage.java:284)
	at cascading.flow.stream.graph.StreamGraph.prepare(StreamGraph.java:181)
	at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:121)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

Chris K Wensel

unread,
Dec 5, 2017, 7:53:46 PM12/5/17
to cascadi...@googlegroups.com
The error is happening in the #prepare() stage of the Operation lifecycle. Traps aren’t supported during the initialization of an Operation (Function etc). 

at cascading.operation.expression.ScriptOperation.prepare(ScriptOperation.java:288)

It makes no sense to do so since it implies a misconfiguration, not bad data.

You might question the use of “>>=“, the Janino compiler seems to be.

org.codehaus.janino.JaninoRuntimeException: SNO: ">>=" reconversion failed

ckw



For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages