Help with using Filter

808 views
Skip to first unread message

Nick Smith

unread,
Aug 1, 2011, 2:38:09 AM8/1/11
to cascadi...@googlegroups.com
Hello everyone,

I am trying to use the Filter in the cascading api .
I have just made some modifications to the existing logParser code and its shown below

package logparser;

 

import cascading.flow.Flow;

import cascading.flow.FlowConnector;

import cascading.operation.regex.RegexParser;

import cascading.pipe.Each;

import cascading.pipe.Pipe;

import cascading.scheme.TextLine;

import cascading.tap.Hfs;

import cascading.tap.Lfs;

import cascading.tap.SinkMode;

import cascading.tap.Tap;

import cascading.tuple.Fields;

 

import java.util.Properties;

 

/**

 *

 */

public class Main

  {

  public static void main( String[] args )

    {

    String inputPath = args[ 0 ];

    String outputPath = args[ 1 ];

 

    TextLine scheme = new TextLine( new Fields( "offset", "line" ) );

    Tap logTap = inputPath.matches( "^[^:]+://.*" ) ? new Hfs( scheme, inputPath ) : new Lfs( scheme, inputPath );

    Fields apacheFields = new Fields( "ip", "time", "method", "event", "status", "size" );

    String apacheRegex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";

    int[] allGroups = {1, 2, 3, 4, 5, 6};

     RegexParser parser = new RegexParser( apacheFields, apacheRegex, allGroups );

     Pipe importPipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );

   ExpressionFilter filter = new ExpressionFilter("ip != 75.185.76.245 ",Integer.class);

    Pipe logPipe = new Pipe("logPipe");

    logPipe = new Each(logPipe,new Fields(“ip"),filter);

    Pipe mainPipe = new Pipe("mainPipe",logPipe);

    

    Tap remoteLogTap = new Hfs( new TextLine(), outputPath, SinkMode.REPLACE );

    Properties properties = new Properties();

    FlowConnector.setApplicationJarClass( properties, Main.class );

    Flow parsedLogFlow = new FlowConnector( properties ).connect( logTap, remoteLogTap, mainPipe );

    parsedLogFlow.start();

    parsedLogFlow.complete();

    }

  }


I am getting the following error


-------------------------------------------

11/07/31 23:36:44 INFO flow.MultiMapReducePlanner: using application jar: /home/hadoop/cascading/logAnalysis/./build/logparser.jar

Exception in thread "main" cascading.flow.PlannerException: could not build flow from assembly: [[logPipe][loganalysis.Main.main(Unknown Source)] unable to resolve argument selector: [{1}:'Number'], with incoming: [{2}:'offset', 'line']]

at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:242)

at cascading.flow.FlowConnector.connect(FlowConnector.java:531)

at cascading.flow.FlowConnector.connect(FlowConnector.java:513)

at cascading.flow.FlowConnector.connect(FlowConnector.java:361)

at cascading.flow.FlowConnector.connect(FlowConnector.java:306)

at cascading.flow.FlowConnector.connect(FlowConnector.java:288)

at loganalysis.Main.main(Unknown Source)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

Caused by: cascading.pipe.OperatorException: [logPipe][loganalysis.Main.main(Unknown Source)] unable to resolve argument selector: [{1}:'Number'], with incoming: [{2}:'offset', 'line']

at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:390)

at cascading.pipe.Each.outgoingScopeFor(Each.java:408)

at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:574)

at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:556)

at cascading.flow.MultiMapReducePlanner.buildFlow(MultiMapReducePlanner.java:198)

... 11 more

Caused by: cascading.tuple.FieldsResolverException: could not select fields: [{1}:'Number'], from: [{2}:'offset', 'line']

at cascading.tuple.Fields.indexOf(Fields.java:798)

at cascading.tuple.Fields.select(Fields.java:851)

at cascading.pipe.Operator.resolveArgumentSelector(Operator.java:386)

... 15 more

-----------------------------------------------


Any ideas why I am encountering this problem and how I can solve this.


Thanks in advance


NicK



Ken Krugler

unread,
Aug 1, 2011, 10:03:07 AM8/1/11
to cascadi...@googlegroups.com

On Jul 31, 2011, at 11:38pm, Nick Smith wrote:

Hello everyone,

I am trying to use the Filter in the cascading api .
I have just made some modifications to the existing logParser code and its shown below

[snip]

   TextLine scheme = new TextLine( new Fields( "offset", "line" ) );

    Tap logTap = inputPath.matches( "^[^:]+://.*" ) ? new Hfs( scheme, inputPath ) : new Lfs( scheme, inputPath );

    Fields apacheFields = new Fields( "ip", "time", "method", "event", "status", "size" );
    String apacheRegex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$";
   int[] allGroups = {1, 2, 3, 4, 5, 6};

     RegexParser parser = new RegexParser( apacheFields, apacheRegex, allGroups );

    Pipe importPipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );
  ExpressionFilter filter = new ExpressionFilter("ip != 75.185.76.245 ",Integer.class);

    Pipe logPipe = new Pipe("logPipe");


You now have a second pipe (logPipe, versus importPipe) that isn't connected to anything, and thus is a head pipe.

So Cascading is assuming this is reading from the input file, with fields "offset" and "line".

Instead of creating this pipe, and the mainPipe below, just use your importPipe throughout the flow.

-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

--------------------------
Ken Krugler
custom data mining solutions






Nick Smith

unread,
Aug 1, 2011, 11:38:17 AM8/1/11
to cascadi...@googlegroups.com

Thank you Ken.
I have modified my code such that I have only one pipe, my importPipe.

But I am still getting some errors as below:
--------------------
hadoop@ubuntu:~/cascading/logAnalysis$ hadoop jar ./build/loganalysis.jar data/verbose.log test123
11/08/01 08:35:43 INFO flow.MultiMapReducePlanner: using application jar: /home/hadoop/cascading/logAnalysis/./build/loganalysis.jar
11/08/01 08:35:43 INFO cascade.Cascade: Concurrent, Inc - Cascading 1.2.3 [hadoop-0.19.2+]
11/08/01 08:35:43 INFO flow.Flow: [import] starting
11/08/01 08:35:43 INFO flow.Flow: [import]  source: Lfs["TextLine[['offset', 'line']->[ALL]]"]["data/verbose.log"]"]
11/08/01 08:35:43 INFO flow.Flow: [import]  sink: Hfs["TextLine[['offset', 'line']->[ALL]]"]["test123"]"]
11/08/01 08:35:44 INFO tap.Hfs: forcing job to local mode, via source: Lfs["TextLine[['offset', 'line']->[ALL]]"]["data/verbose.log"]"]
11/08/01 08:35:44 INFO flow.Flow: [import]  parallel execution is enabled: true
11/08/01 08:35:44 INFO flow.Flow: [import]  starting jobs: 1
11/08/01 08:35:44 INFO flow.Flow: [import]  allocating threads: 1
11/08/01 08:35:44 INFO flow.FlowStep: [import] starting step: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["test123"]"]
11/08/01 08:35:44 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/08/01 08:35:44 INFO mapred.FileInputFormat: Total input paths to process : 1
11/08/01 08:35:45 INFO mapred.FileInputFormat: Total input paths to process : 1
11/08/01 08:35:45 INFO mapred.MapTask: numReduceTasks: 0
11/08/01 08:35:45 WARN mapred.LocalJobRunner: job_local_0001
java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:354)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
... 5 more
Caused by: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:34)
... 10 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
... 13 more
Caused by: cascading.operation.OperationException: could not compile expression: Number != 24366091 
at cascading.operation.expression.ExpressionOperation.getExpressionEvaluator(ExpressionOperation.java:167)
at cascading.operation.expression.ExpressionOperation.prepare(ExpressionOperation.java:207)
at cascading.pipe.Each$EachHandler.prepare(Each.java:499)
at cascading.flow.stack.EachMapperStackElement.prepare(EachMapperStackElement.java:67)
at cascading.flow.stack.StackElement.open(StackElement.java:178)
at cascading.flow.stack.StackElement.open(StackElement.java:182)
at cascading.flow.stack.FlowMapperStack.<init>(FlowMapperStack.java:88)
at cascading.flow.FlowMapper.configure(FlowMapper.java:58)
... 18 more
Caused by: org.codehaus.janino.CompileException: Line 1, Column 8: Expression "java.lang.Number" is not an rvalue
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:8185)
at org.codehaus.janino.UnitCompiler.toRvalueOrCE(UnitCompiler.java:4723)
at org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:3605)
at org.codehaus.janino.UnitCompiler.access$8500(UnitCompiler.java:108)
at org.codehaus.janino.UnitCompiler$11.visitAmbiguousName(UnitCompiler.java:3585)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:2057)
at org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:3594)
at org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:3625)
at org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:108)
at org.codehaus.janino.UnitCompiler$11.visitBinaryOperation(UnitCompiler.java:3568)
at org.codehaus.janino.Java$BinaryOperation.accept(Java.java:2515)
at org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:3594)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:3527)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1439)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:108)
at org.codehaus.janino.UnitCompiler$4.visitReturnStatement(UnitCompiler.java:748)
at org.codehaus.janino.Java$ReturnStatement.accept(Java.java:1665)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:758)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:777)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:108)
at org.codehaus.janino.UnitCompiler$4.visitBlock(UnitCompiler.java:738)
at org.codehaus.janino.Java$Block.accept(Java.java:1280)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:758)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1783)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:723)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:705)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:431)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:329)
at org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:302)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:703)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:308)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:286)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:420)
at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:400)
at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:607)
at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:443)
at org.codehaus.janino.Cookable.cook(Cookable.java:72)
at org.codehaus.janino.Cookable.cook(Cookable.java:64)
at org.codehaus.janino.Cookable.cook(Cookable.java:114)
at org.codehaus.janino.ExpressionEvaluator.<init>(ExpressionEvaluator.java:210)
at cascading.operation.expression.ExpressionOperation.getExpressionEvaluator(ExpressionOperation.java:163)
... 25 more
11/08/01 08:35:50 WARN flow.FlowStep: [import] task completion events identify failed tasks
11/08/01 08:35:50 WARN flow.FlowStep: [import] task completion events count: 0
11/08/01 08:35:50 WARN flow.Flow: stopping jobs
11/08/01 08:35:50 INFO flow.FlowStep: [import] stopping: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["test123"]"]
11/08/01 08:35:50 WARN flow.Flow: stopped jobs
11/08/01 08:35:50 WARN flow.Flow: shutting down job executor
11/08/01 08:35:50 WARN flow.Flow: shutdown complete
11/08/01 08:35:50 INFO hadoop.Hadoop18TapUtil: deleting temp path test123/_temporary
Exception in thread "main" cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["test123"]"], with job id: job_local_0001, please see cluster logs for failure messages
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:175)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:140)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:129)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
------------------------------------------

Nick Smith

unread,
Aug 1, 2011, 2:36:55 PM8/1/11
to cascadi...@googlegroups.com
Right Now I have made the following modifications:

I used the logParser to write the file to hfs and am reading it from there to filter out the required tuples.

Following is my code:
------------------------------

package loganalysis;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.operation.expression.ExpressionFilter;

import java.util.Properties;

/**
 *
 */
public class Main
  {
  public static void main( String[] args )
    {
    String inputPath = args[0];
    String outputPath = args[1];
    
    Tap logTap = new Hfs(new TextLine(),inputPath);
    
    Pipe importPipe = new Pipe("importPipe");
   
    ExpressionFilter filter = new ExpressionFilter("ip != 75.185.76.245",Integer.TYPE);
    importPipe = new Each(importPipe,filter);
   
    Tap sinkTap = new Hfs(new TextLine(),outputPath,SinkMode.REPLACE);
    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass( properties, Main.class );

    FlowConnector connector = new FlowConnector(properties);
    
    Flow flow = connector.connect(logTap,sinkTap,importPipe);
    flow.start();
    flow.complete(); 
   }
  }

-----------------------------

I seem to getting this error:

----------------------------
hadoop@ubuntu:~/cascading/loganalysis$ hadoop jar ./build/loganalysis.jar /user/hadoop/LPOutput/part-00000 test1234
11/08/01 11:29:56 INFO flow.MultiMapReducePlanner: using application jar: /home/hadoop/cascading/loganalysis/./build/loganalysis.jar
11/08/01 11:29:56 INFO cascade.Cascade: Concurrent, Inc - Cascading 1.2.3 [hadoop-0.19.2+]
11/08/01 11:29:56 INFO flow.Flow: [importPipe] starting
11/08/01 11:29:56 INFO flow.Flow: [importPipe]  source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["/user/hadoop/LPOutput/part-00000"]"]
11/08/01 11:29:56 INFO flow.Flow: [importPipe]  sink: Hfs["TextLine[['offset', 'line']->[ALL]]"]["test1234"]"]
11/08/01 11:29:57 INFO flow.Flow: [importPipe]  parallel execution is enabled: true
11/08/01 11:29:57 INFO flow.Flow: [importPipe]  starting jobs: 1
11/08/01 11:29:57 INFO flow.Flow: [importPipe]  allocating threads: 1
11/08/01 11:29:57 INFO flow.FlowStep: [importPipe] starting step: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["test1234"]"]
11/08/01 11:29:57 INFO mapred.FileInputFormat: Total input paths to process : 1
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] task completion events identify failed tasks
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] task completion events count: 10
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000003_0, Status : SUCCEEDED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000000_0, Status : FAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000001_0, Status : FAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000001_1, Status : FAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000000_1, Status : FAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000000_2, Status : FAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000001_2, Status : FAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000001_3, Status : TIPFAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000000_3, Status : TIPFAILED
11/08/01 11:30:32 WARN flow.FlowStep: [importPipe] event = Task Id : attempt_201108010832_0009_m_000002_0, Status : SUCCEEDED
11/08/01 11:30:32 WARN flow.Flow: stopping jobs
11/08/01 11:30:32 INFO flow.FlowStep: [importPipe] stopping: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["test1234"]"]
11/08/01 11:30:32 WARN flow.Flow: stopped jobs
11/08/01 11:30:32 WARN flow.Flow: shutting down job executor
11/08/01 11:30:32 WARN flow.Flow: shutdown complete
11/08/01 11:30:32 INFO hadoop.Hadoop18TapUtil: deleting temp path test1234/_temporary
Exception in thread "main" cascading.flow.FlowException: step failed: (1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["test1234"]"], with job id: job_201108010832_0009, please see cluster logs for failure messages
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:175)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:140)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:129)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
------------------------------

I noticed that the output directory test1234 is getting created in the hfs with just the logs and of course no output.

Do u have any suggestions?
thanks
-NicK
Reply all
Reply to author
Forward
0 new messages