package logparser;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import java.util.Properties;
/**
*
*/
public class Main
{
public static void main( String[] args )
{
String inputPath = args[ 0 ];
String outputPath = args[ 1 ];
TextLine scheme = new TextLine( new Fields( "offset", "line" ) );
Tap logTap = inputPath.matches( "^[^:]+://.*" ) ? new Hfs( scheme, inputPath ) : new Lfs( scheme, inputPath );
Fields apacheFields = new Fields( "ip", "time", "method", "event", "status", "size" );
String apacheRegex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
int[] allGroups = {1, 2, 3, 4, 5, 6};
RegexParser parser = new RegexParser( apacheFields, apacheRegex, allGroups );
Pipe importPipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );
ExpressionFilter filter = new ExpressionFilter("ip != 75.185.76.245 ",Integer.class);
Pipe logPipe = new Pipe("logPipe");
logPipe = new Each(logPipe,new Fields(“ip"),filter);
Pipe mainPipe = new Pipe("mainPipe",logPipe);
Tap remoteLogTap = new Hfs( new TextLine(), outputPath, SinkMode.REPLACE );
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );
Flow parsedLogFlow = new FlowConnector( properties ).connect( logTap, remoteLogTap, mainPipe );
parsedLogFlow.start();
parsedLogFlow.complete();
}
}
I am getting the following error
-------------------------------------------
11/07/31 23:36:44 INFO flow.MultiMapReducePlanner: using application jar: /home/hadoop/cascading/logAnalysis/./build/logparser.jar
Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly: [[logPipe][loganalysis.Main.main(Unknown Source)] unable to resolve argument selector: [{1}:'Number'], with incoming: [{2}:'offset', 'line']]
at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:242)
at cascading.flow.FlowConnector.connect(FlowConnector.java:531)
at cascading.flow.FlowConnector.connect(FlowConnector.java:513)
at cascading.flow.FlowConnector.connect(FlowConnector.java:361)
at cascading.flow.FlowConnector.connect(FlowConnector.java:306)
at cascading.flow.FlowConnector.connect(FlowConnector.java:288)
at loganalysis.Main.main(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: cascading.pipe.OperatorException: [logPipe][loganalysis.Main.main(Unknown Source)] unable to resolve argument selector: [{1}:'Number'], with incoming: [{2}:'offset', 'line']
at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:390)
at cascading.pipe.Each.outgoingScopeFor(Each.java:408)
at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:574)
at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:556)
at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:198)
... 11 more
Caused by: cascading.tuple.FieldsResolverException: could not select fields: [{1}:'Number'], from: [{2}:'offset', 'line']
at cascading.tuple.Fields.indexOf(Fields.java:798)
at cascading.tuple.Fields.select(Fields.java:851)
at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:386)
... 15 more
-----------------------------------------------
Any ideas why I am encountering this problem and how I can solve this.
Thanks in advance
NicK
Hello everyone,I am trying to use the Filter in the cascading api .I have just made some modifications to the existing logParser code and its shown below
TextLine scheme = new TextLine( new Fields( "offset", "line" ) );Tap logTap = inputPath.matches( "^[^:]+://.*" ) ? new Hfs( scheme, inputPath ) : new Lfs( scheme, inputPath );
Fields apacheFields = new Fields( "ip", "time", "method", "event", "status", "size" );String apacheRegex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";int[] allGroups = {1, 2, 3, 4, 5, 6};RegexParser parser = new RegexParser( apacheFields, apacheRegex, allGroups );
Pipe importPipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );ExpressionFilter filter = new ExpressionFilter("ip != 75.185.76.245 ",Integer.class);Pipe logPipe = new Pipe("logPipe");
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.