RegexSplitter or Function to parse and rewrite the file

40 views
Skip to first unread message

Vimal Patel

unread,
Sep 23, 2015, 9:42:38 AM9/23/15
to cascading-user

I created small application using RegexSplitter. Application producing correct result but throwing error. I don’t know what is wrong in it and if it is throwing error then it should not produce output file.



public class

        Main {

    public static void main(String[] args) {

       

        // Input file

        String inputPath = args[0];

        // Output file

        String outputPath = args[1];

 

        Tap inTap = new FileTap(new TextLine(), inputPath);

 

        Tap outTap = new FileTap(new TextDelimited(true, "\t"), outputPath, SinkMode.REPLACE);

               

        String apachesplitter = "@";

       

        Fields apacheFields = new Fields("ip", "time", "request", "response", "size");

                    

        RegexSplitter splitter = new RegexSplitter(apacheFields,apachesplitter);

               

        Pipe regexImport = new Each("regexImport", new Fields("line"), splitter);

       

        Properties properties = AppProps.appProps()

                .setName("part17")

                .buildProperties();

 

        // connect the assembly to the SOURCE and SINK taps

        Flow parsedLogFlow = new LocalFlowConnector(properties).connect("LogsTransform", inTap, outTap, regexImport);

        // Runs the flow

        parsedLogFlow.complete();

    }

}



Input file.

in24@0400@0@200@1839

in24@0400@0@200@1840

 

Output file.

ip            time       request                response             size

in24        0400       0              200         1839

in24        0400       0              200         1840

 

Logfile.

C:\Users\iwg323\Desktop\BigData\Cascading_Training\labs\JavaDeveloperTraining\part17>java -Xmx1024m -cp "..\common\driven.jar;.\build\libs\devtraining-fat.jar" devtraining.Main ..\logs\NASA_access_lo

_Aug96.txt  output\NASA_access_log_Aug96.txt

2015-09-22 22:42:39,249 INFO  [main] property.AppProps (AppProps.java:getAppID(162)) - using app.id: B25F714964154487A455F2D31725BBC3

2015-09-22 22:42:39,290 INFO  [main] management.CascadingServices (CascadingServices.java:loadProperties(165)) - loading properties: cascading/management/service.properties, from jar: file:/C:/Users/

wg323/Desktop/BigData/Cascading_Training/labs/JavaDeveloperTraining/common/driven.jar

2015-09-22 22:42:39,327 INFO  [main] provider.ServiceLoader (ProviderLoader.java:getClassLoader(100)) - loading services from library: file:/C:/Users/iwg323/Desktop/BigData/Cascading_Training/labs/Ja

aDeveloperTraining/common/driven.jar

2015-09-22 22:42:39,530 INFO  [main] rest.DrivenDocumentService (DrivenDocumentService.java:<init>(24)) - loading Cascading 2 services

2015-09-22 22:42:39,782 INFO  [main] rest.BulkUploadClient (BulkUploadClient.java:<init>(65)) - api key not supplied

2015-09-22 22:42:39,783 INFO  [main] rest.DrivenDocumentService (DrivenDocumentService.java:logInfo(550)) - starting service for Cascading 2.5.6. sending telemetry to http://edhddn1126.kdc.capitalone

com:8080 from plugin /C:/Users/iwg323/Desktop/BigData/Cascading_Training/labs/JavaDeveloperTraining/common/driven.jar

2015-09-22 22:42:39,786 INFO  [main] rest.BufferedWriter (BufferedWriter.java:createRecorder(65)) - archive recording is not enabled

2015-09-22 22:42:39,993 WARN  [main] rest.BulkUploadClient (HttpUtil.java:logHandshakeAtLevel(29)) - server version: '1.3.0' does not match incoming communication version: '1.3.3-eap-7'

2015-09-22 22:42:39,994 INFO  [main] rest.BulkUploadClient (RESTClient.java:handleServerHandshakeResponse(240)) - resolved protocol config: {driven.protocol.command.suppress=false, driven.protocol.to

erance.plugin.action=allow, driven.protocol.slice.suppress=false}

2015-09-22 22:42:40,020 INFO  [main] rest.BulkUploadClient (RESTClient.java:testConnectivity(200)) - available http://edhddn1126.kdc.capitalone.com:8080

2015-09-22 22:42:40,266 INFO  [ESBufferDaemon-Flusher-1442976159785] rest.DrivenDocumentService (DrivenDocumentService.java:logInfo(550)) - plugin protocol action: 'allow'

2015-09-22 22:42:40,277 INFO  [ESBufferDaemon-Flusher-1442976159785] rest.DrivenDocumentService (BulkUploadClient.java:verboseAppLink(217)) -

 

    ________        __    ________        .__

   /  _____/  _____/  |_  \______ \_______|__|__  __ ____   ____

  /   \  ____/ __ \   __\  |    |  \_  __ \  \  \/ // __ \ /    \

  \    \_\  \  ___/|  |    |    `   \  | \/  |\   /\  ___/|   |  \

   \______  /\_____>__|   /_______  /__|  |__| \_/  \_____>___|  /

          \/                      \/                           \/

   plugin version: 1.3.3-eap-7

 

        Follow this link to view your application:

 

        http://edhddn1126.kdc.capitalone.com:8080/driven/B25F714964154487A455F2D31725BBC3

 

 

2015-09-22 22:42:40,295 INFO  [flow LogsTransform] util.Version (Version.java:printBanner(78)) - Concurrent, Inc - Cascading 2.5.6

2015-09-22 22:42:40,301 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform] starting

2015-09-22 22:42:40,303 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform]  source: FileTap["TextLine[['num', 'line']->[ALL]]"]["..\logs\NASA_access_log_Aug96.txt"]

2015-09-22 22:42:40,310 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform]  sink: FileTap["TextDelimited[['ip', 'time', 'request', 'response', 'size']]"]["output\NAS

_access_log_Aug96.txt"]

2015-09-22 22:42:40,315 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform]  parallel execution is enabled: true

2015-09-22 22:42:40,319 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform]  starting jobs: 1

2015-09-22 22:42:40,321 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform]  allocating threads: 1

2015-09-22 22:42:40,327 INFO  [pool-1-thread-1] flow.FlowStep (BaseFlowStep.java:logInfo(834)) - [LogsTransform] starting step: local

2015-09-22 22:42:40,343 ERROR [pool-3-thread-1] stream.TrapHandler (TrapHandler.java:handleReThrowableException(103)) - caught Throwable, no trap available, rethrowing

cascading.tuple.TupleException: operation added the wrong number of fields, expected: ['ip', 'time', 'request', 'response', 'size'], got result size: 1

        at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:125)

        at cascading.operation.regex.RegexSplitter.operate(RegexSplitter.java:99)

        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)

        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)

        at cascading.flow.stream.SourceStage.map(SourceStage.java:102)

        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)

        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)

        at java.util.concurrent.FutureTask.run(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

        at java.lang.Thread.run(Unknown Source)

2015-09-22 22:42:40,360 ERROR [pool-3-thread-1] stream.SourceStage (SourceStage.java:map(110)) - caught throwable

cascading.tuple.TupleException: operation added the wrong number of fields, expected: ['ip', 'time', 'request', 'response', 'size'], got result size: 1

        at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:125)

        at cascading.operation.regex.RegexSplitter.operate(RegexSplitter.java:99)

        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)

        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)

        at cascading.flow.stream.SourceStage.map(SourceStage.java:102)

        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)

        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)

        at java.util.concurrent.FutureTask.run(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

        at java.lang.Thread.run(Unknown Source)

2015-09-22 22:42:40,406 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform] stopping all jobs

2015-09-22 22:42:40,407 INFO  [flow LogsTransform] flow.FlowStep (BaseFlowStep.java:logInfo(834)) - [LogsTransform] stopping: local

2015-09-22 22:42:40,408 INFO  [flow LogsTransform] flow.Flow (BaseFlow.java:logInfo(1354)) - [LogsTransform] stopped all jobs

Exception in thread "main" cascading.flow.FlowException: local step failed

        at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:219)

        at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)

        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)

        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)

        at java.util.concurrent.FutureTask.run(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

        at java.lang.Thread.run(Unknown Source)

Caused by: cascading.tuple.TupleException: operation added the wrong number of fields, expected: ['ip', 'time', 'request', 'response', 'size'], got result size: 1

        at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:125)

        at cascading.operation.regex.RegexSplitter.operate(RegexSplitter.java:99)

        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)

        at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)

        at cascading.flow.stream.SourceStage.map(SourceStage.java:102)

        at cascading.flow.stream.SourceStage.call(SourceStage.java:53)

        at cascading.flow.stream.SourceStage.call(SourceStage.java:38)

        ... 4 more

2015-09-22 22:42:40,470 INFO  [cascading shutdown hooks] rest.DrivenDocumentService (DrivenDocumentService.java:logInfo(550)) - running DrivenDocumentService shutdown hook

2015-09-22 22:42:40,517 INFO  [cascading shutdown hooks] rest.DrivenDocumentService (BulkUploadClient.java:verboseAppLink(217)) -

 

    ________        __    ________        .__

   /  _____/  _____/  |_  \______ \_______|__|__  __ ____   ____

  /   \  ____/ __ \   __\  |    |  \_  __ \  \  \/ // __ \ /    \

  \    \_\  \  ___/|  |    |    `   \  | \/  |\   /\  ___/|   |  \

   \______  /\_____>__|   /_______  /__|  |__| \_/  \_____>___|  /

          \/                      \/                           \/

   plugin version: 1.3.3-eap-7

 

        Follow this link to view your application:

 

        http://edhddn1126.kdc.capitalone.com:8080/driven/B25F714964154487A455F2D31725BBC3

 

 

2015-09-22 22:42:40,544 INFO  [cascading shutdown hooks] rest.DrivenDocumentService (DrivenDocumentService.java:logInfo(550)) - stopped document service

 

Ken Krugler

unread,
Sep 23, 2015, 9:52:27 AM9/23/15
to cascadi...@googlegroups.com


From: Vimal Patel

Sent: September 23, 2015 6:42:37am PDT

To: cascading-user

Subject: RegexSplitter or Function to parse and rewrite the file


I created small application using RegexSplitter. Application producing correct result but throwing error. I don’t know what is wrong in it

Is it possible you have an empty line in your input file?

and if it is throwing error then it should not produce output file.

No idea, but it looks like you're running with Cascading local mode on Windows; sometimes Windows doesn't let Java code delete a file (thinks it is still open?)

-- Ken


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/25aa3768-433d-42ec-9728-cfe1812387c4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Vimal Patel

unread,
Sep 23, 2015, 1:32:46 PM9/23/15
to cascading-user
i deleted file manually and just run application it is still throwing same error and producing file with error. do you see any issue anything wrong in code ?
...

Chris K Wensel

unread,
Sep 23, 2015, 1:37:10 PM9/23/15
to cascadi...@googlegroups.com
Place a Debug filter before your RegexSplitter function so you can see what data caused the failure. 

it’s likely you have a blank line at the end of your file. you should filter for blank lines if that will be common in production.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Vimal Patel

unread,
Sep 23, 2015, 1:53:24 PM9/23/15
to cascading-user


On Wednesday, September 23, 2015 at 9:52:27 AM UTC-4, kkrugler wrote:


From: Vimal Patel

Thanks ken i found the issue, yes you were right it was empty line end of the file and application run clean. i am very happy to say i my first app is running smooooooooooth. 

Vimal Patel

unread,
Sep 23, 2015, 2:04:48 PM9/23/15
to cascading-user
Thanks chris i appreciate your post  i will keep your suggestion for my next development and make as standard practice during development.
...
Reply all
Reply to author
Forward
0 new messages