Getting calculated result from expression function. Available options ???

24 views
Skip to first unread message

Kunal Ghosh

unread,
Jun 27, 2017, 3:17:24 AM6/27/17
to cascading-user
Hi,
I have extended 'ExpressionFunction class' to cater my requirements. In this I am processing data and calculating the true/false/error count based on certain conditions.
Now I want this counts to be made available in sink so that I can write those into file. Is there any way to achieve this ??  Or any other way like inserting these calculated values into the data tuple ?
I just want whatever I am calculating in custom expression to be made available in file which are written by sink. So that I can read those files and proceed.
Please help.



import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.ArrayUtils;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.expression.ExpressionOperation;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Rename;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
public class Main {
public static void main(String[] args) {
String inPath_SRC_1498030696032 = args[0]+File.separator+"10_rows.csv";
Fields srcFields_SRC_1498030696032 = new Fields("empid","gender","title","nameset","surname","city","statefull","zipcode","header1","header2","header3","header4","header5","header6","header7","header8","header9","header10","header11","header12","header13","header14","header15","header16","header17","header18","header19","header20","header21","header22","header23","header24","header25","header26","header27","header28","header29","header30","header31","header32");
Fields srcFields_SRC_1498030696032_rename = new Fields("empid_SRC_1498030696032","gender_SRC_1498030696032","title_SRC_1498030696032","nameset_SRC_1498030696032","surname_SRC_1498030696032","city_SRC_1498030696032","statefull_SRC_1498030696032","zipcode_SRC_1498030696032","header1_SRC_1498030696032","header2_SRC_1498030696032","header3_SRC_1498030696032","header4_SRC_1498030696032","header5_SRC_1498030696032","header6_SRC_1498030696032","header7_SRC_1498030696032","header8_SRC_1498030696032","header9_SRC_1498030696032","header10_SRC_1498030696032","header11_SRC_1498030696032","header12_SRC_1498030696032","header13_SRC_1498030696032","header14_SRC_1498030696032","header15_SRC_1498030696032","header16_SRC_1498030696032","header17_SRC_1498030696032","header18_SRC_1498030696032","header19_SRC_1498030696032","header20_SRC_1498030696032","header21_SRC_1498030696032","header22_SRC_1498030696032","header23_SRC_1498030696032","header24_SRC_1498030696032","header25_SRC_1498030696032","header26_SRC_1498030696032","header27_SRC_1498030696032","header28_SRC_1498030696032","header29_SRC_1498030696032","header30_SRC_1498030696032","header31_SRC_1498030696032","header32_SRC_1498030696032");
String[] srcFields_SRC_1498030696032_smallArray = {"empid_src_1498030696032","gender_src_1498030696032","title_src_1498030696032","nameset_src_1498030696032","surname_src_1498030696032","city_src_1498030696032","statefull_src_1498030696032","zipcode_src_1498030696032","header1_src_1498030696032","header2_src_1498030696032","header3_src_1498030696032","header4_src_1498030696032","header5_src_1498030696032","header6_src_1498030696032","header7_src_1498030696032","header8_src_1498030696032","header9_src_1498030696032","header10_src_1498030696032","header11_src_1498030696032","header12_src_1498030696032","header13_src_1498030696032","header14_src_1498030696032","header15_src_1498030696032","header16_src_1498030696032","header17_src_1498030696032","header18_src_1498030696032","header19_src_1498030696032","header20_src_1498030696032","header21_src_1498030696032","header22_src_1498030696032","header23_src_1498030696032","header24_src_1498030696032","header25_src_1498030696032","header26_src_1498030696032","header27_src_1498030696032","header28_src_1498030696032","header29_src_1498030696032","header30_src_1498030696032","header31_src_1498030696032","header32_src_1498030696032"};
Class[] srcClassType_SRC_1498030696032  = {Integer.class,String.class,String.class,String.class,String.class,String.class,String.class,Integer.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class};
Fields srcFields_SRC_1498030696032_renameSmall = new Fields(srcFields_SRC_1498030696032_smallArray,srcClassType_SRC_1498030696032);
Tap srcTap_SRC_1498030696032 = new Hfs( new TextDelimited( srcFields_SRC_1498030696032 ,true , "," ), inPath_SRC_1498030696032 );
Pipe processPipe_SRC_1498030696032 = new Pipe("SRC_1498030696032");
processPipe_SRC_1498030696032 = new Rename(processPipe_SRC_1498030696032, srcFields_SRC_1498030696032 , srcFields_SRC_1498030696032_renameSmall);
Fields expFields_EXP_1498030707317 = new Fields("empid","gender","title","nameset","surname","city","statefull","zipcode","header1","header2","header3","header4","header5","header6","header7","header8","header9","header10","header11","header12","header13","header14","header15","header16","header17","header18","header19","header20","header21","header22","header23","header24","header25","header26","header27","header28","header29","header30","header31","header32");
Fields expFields_EXP_1498030707317_rename = new Fields("empid_SRC_1498030696032","gender_SRC_1498030696032","title_SRC_1498030696032","nameset_SRC_1498030696032","surname_SRC_1498030696032","city_SRC_1498030696032","statefull_SRC_1498030696032","zipcode_SRC_1498030696032","header1_SRC_1498030696032","header2_SRC_1498030696032","header3_SRC_1498030696032","header4_SRC_1498030696032","header5_SRC_1498030696032","header6_SRC_1498030696032","header7_SRC_1498030696032","header8_SRC_1498030696032","header9_SRC_1498030696032","header10_SRC_1498030696032","header11_SRC_1498030696032","header12_SRC_1498030696032","header13_SRC_1498030696032","header14_SRC_1498030696032","header15_SRC_1498030696032","header16_SRC_1498030696032","header17_SRC_1498030696032","header18_SRC_1498030696032","header19_SRC_1498030696032","header20_SRC_1498030696032","header21_SRC_1498030696032","header22_SRC_1498030696032","header23_SRC_1498030696032","header24_SRC_1498030696032","header25_SRC_1498030696032","header26_SRC_1498030696032","header27_SRC_1498030696032","header28_SRC_1498030696032","header29_SRC_1498030696032","header30_SRC_1498030696032","header31_SRC_1498030696032","header32_SRC_1498030696032");
String[] expFields_EXP_1498030707317_smallArray = {"empid_src_1498030696032","gender_src_1498030696032","title_src_1498030696032","nameset_src_1498030696032","surname_src_1498030696032","city_src_1498030696032","statefull_src_1498030696032","zipcode_src_1498030696032","header1_src_1498030696032","header2_src_1498030696032","header3_src_1498030696032","header4_src_1498030696032","header5_src_1498030696032","header6_src_1498030696032","header7_src_1498030696032","header8_src_1498030696032","header9_src_1498030696032","header10_src_1498030696032","header11_src_1498030696032","header12_src_1498030696032","header13_src_1498030696032","header14_src_1498030696032","header15_src_1498030696032","header16_src_1498030696032","header17_src_1498030696032","header18_src_1498030696032","header19_src_1498030696032","header20_src_1498030696032","header21_src_1498030696032","header22_src_1498030696032","header23_src_1498030696032","header24_src_1498030696032","header25_src_1498030696032","header26_src_1498030696032","header27_src_1498030696032","header28_src_1498030696032","header29_src_1498030696032","header30_src_1498030696032","header31_src_1498030696032","header32_src_1498030696032"};
Fields expFields_EXP_1498030707317_renameSmall = new Fields(expFields_EXP_1498030707317_smallArray);
Class[] expdataType_EXP_1498030707317  = {Integer.class,String.class,String.class,String.class,String.class,String.class,String.class,Integer.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class,String.class};
Pipe processPipe_EXP_1498030707317 = new Pipe("recommended_matches");

String outPath = args[1];
Tap trgTap_TRG_1498030873961 = new Hfs(  new TextDelimited( true , "," ), outPath , SinkMode.REPLACE);
Tap outTap = new Hfs(  new TextDelimited(new Fields("sample"), true , "," ), outPath+"/expr" , SinkMode.REPLACE);
Fields expField40 = new Fields("expr1");
Map<String,String> exprColumnNameMap = new HashMap<String,String>();
exprColumnNameMap.put("expr1","empid_src_1498030696032 >= 5");
CustomExpressionFunction expFun40 = new CustomExpressionFunction(expField40,"empid_src_1498030696032 >= 5",expFields_EXP_1498030707317_smallArray,expdataType_EXP_1498030707317);

processPipe_EXP_1498030707317 = new Each(processPipe_SRC_1498030696032,expFun40,Fields.ALL );

Fields expField41 = new Fields("expr2");
exprColumnNameMap.put("expr2","gender_src_1498030696032.equals(\"male\")");
CustomExpressionFunction expFun41 = new CustomExpressionFunction(expField41,"gender_src_1498030696032.equals(\"male\")",expFields_EXP_1498030707317_smallArray,expdataType_EXP_1498030707317);
processPipe_EXP_1498030707317 = new Each(processPipe_EXP_1498030707317,expFun41,Fields.ALL );

Fields expField42 = new Fields("expr3");
exprColumnNameMap.put("expr3","zipcode_src_1498030696032 == 10");
CustomExpressionFunction expFun42 = new CustomExpressionFunction(expField42,"zipcode_src_1498030696032 == 10",expFields_EXP_1498030707317_smallArray,expdataType_EXP_1498030707317);
processPipe_EXP_1498030707317 = new Each(processPipe_EXP_1498030707317,expFun42,Fields.ALL );

FlowDef flowDef = FlowDef.flowDef()
.addSource(processPipe_SRC_1498030696032,srcTap_SRC_1498030696032)
.addTailSink(processPipe_EXP_1498030707317,trgTap_TRG_1498030873961)
//.addTailSink(countPipe,outTap)
;
Map<Object, Object> props = new HashMap<Object, Object>();
Flow flow = new Hadoop2MR1FlowConnector(props).connect(flowDef);
System.out.println("Count >>>>>>>>>>> "+flow.getFlowProcess());
flow.complete();
}

private static class CustomExpressionFunction  extends ExpressionFunction  implements Function<ExpressionOperation.Context> 
{
long[] exprCounts = new long[3];
private Map<String, long[]> _exprCountMap = new HashMap<String, long[]>();
//public Map<String,String> exprColumnNameMap = new HashMap<String,String>();
TupleEntryCollector outputCollector;
public CustomExpressionFunction( Fields fieldDeclaration, String expression, Class parameterType )
{
super( fieldDeclaration, expression, parameterType );

if( fieldDeclaration.size() != 1 )
throw new IllegalArgumentException( "fieldDeclaration may only declare one field, was " + fieldDeclaration.print() );
}

public CustomExpressionFunction( Fields fieldDeclaration, String expression, String[] parameterNames, Class[] parameterTypes )
{
super( fieldDeclaration, expression, parameterNames, parameterTypes );

if( fieldDeclaration.size() != 1 )
throw new IllegalArgumentException( "fieldDeclaration may only declare one field, was " + fieldDeclaration.print() );
}

/** @see Function#operate(cascading.flow.FlowProcess,cascading.operation.FunctionCall) */
public void operate( FlowProcess flowProcess, FunctionCall<Context> functionCall )
{
outputCollector = functionCall.getOutputCollector();
TupleEntry arguments = functionCall.getArguments();

Object value = evaluate( functionCall.getContext(), arguments );
String declaredField = functionCall.getDeclaredFields().get(0).toString().trim();
try {
//System.out.println("====> column type::: "+fields.get(0).getClass().getName());
//dst[i] = new String(object.array());
System.out.println("Get Fields >>>>>>>>>>> "+arguments.getFields());
System.out.println(" data >>>>> "+arguments.getTuple());

//if(exprColumnNameMap.containsKey(declaredField)){
//System.out.println(">>>>>>>>>>>>>>>>>>>>>>> Index <<<<<<<<<<<<<<<<<<<<<< "+declaredField);
int resultIndex = -1;
if(value != null ){
if(value.equals(true)){
resultIndex = 0;
}else{
resultIndex = 1;
}
}else{
resultIndex = 2;
}
long[] exprCounts = new long[3];
if (_exprCountMap.containsKey(declaredField))
{

exprCounts = _exprCountMap.remove(declaredField);
//System.out.println(">>>>> Field >>> "+declaredField+ " , expr >>> "+ArrayUtils.toString(exprCounts));
}
else
{
exprCounts[0] = 0;
exprCounts[1] = 0;
exprCounts[2] = 0;
}
exprCounts[resultIndex]++;
_exprCountMap.put(declaredField, exprCounts);
//System.out.println("Index >>> "+resultIndex+" ,, value >>> "+exprCounts[resultIndex]);
//}
} catch (Exception e) {
//throw new RuntimeException(e);
e.printStackTrace();
}



//System.out.println(" result >>>> "+value+" , field >> "+declaredField+" , expr>> "+ArrayUtils.toString(_exprCountMap.get("'expr1'"))+", data >>> "+ArrayUtils.toString(exprCounts));
outputCollector.add( new Tuple(value) );
}
@Override
public void cleanup(FlowProcess flowProcess,OperationCall<Context> operationCall) {
// TODO Auto-generated method stub
super.cleanup(flowProcess, operationCall);
System.out.println ("==========********** clean up called ************"+_exprCountMap.size());
try
{
for (Map.Entry<String, long[]> entry : _exprCountMap.entrySet())
{
System.out.println(" >>>>>>>>>>>>>>>>> Map >>>> "+entry.getKey()+" >>> "+ArrayUtils.toString(_exprCountMap.get(entry.getKey())));
}
outputCollector.close();
// System.out.println(" >>>>>>>>>>>>>>>>> Map >>>> Expr1 >>> "+ArrayUtils.toString(_exprCountMap.get("'expr1'")));
// System.out.println(" >>>>>>>>>>>>>>>>> Map >>>> Expr2 >>> "+ArrayUtils.toString(_exprCountMap.get("'expr2'")));
// System.out.println(" >>>>>>>>>>>>>>>>> Map >>>> Expr3 >>> "+ArrayUtils.toString(_exprCountMap.get("'expr3'")));
} catch (Exception e)
{
e.printStackTrace();
}
}
}

}

Kunal Ghosh

unread,
Jun 28, 2017, 1:03:37 PM6/28/17
to cascading-user
hi Chris, Would you please guide me !!!
Reply all
Reply to author
Forward
0 new messages