Error during Read Multiple Source Files using PartitionTap

69 views
Skip to first unread message

Kunal Lahiri

unread,
Apr 7, 2016, 2:08:09 PM4/7/16
to cascading-user
I am trying to read some raw files dropped in the lake by external systems in 1 single process and write to processed parquet snappy partitioned files. The target side process works. But reading multiple source files using partition tap isn't working. The code gives the below error:

Error: cascading.tap.TapException: unable to parse partition given parent: /user/userid/testinput.csv and child: /user/userid/testinput.csv at cascading.tap.partition.PartitionTupleEntryIterator.<init>(PartitionTupleEntryIterator.java:53) at cascading.tap.partition.BasePartitionTap$PartitionIterator.createPartitionEntryIterator(BasePartitionTap.java:90) at cascading.tap.partition.BasePartitionTap$PartitionIterator.<init>(BasePartitionTap.java:73) at cascading.tap.partition.BasePartitionTap.openForRead(BasePartitionTap.java:343) at cascading.tap.hadoop.PartitionTap.openForRead(PartitionTap.java:214) at cascading.tap.hadoop.PartitionTap.openForRead(PartitionTap.java:79) at cascading.flow.stream.element.SourceStage.map(SourceStage.java:82) at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66) at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:142) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

Below is the code:
        String SourceFileName = "/user/userid/testinput.csv/";
        String TargetFileName = "/user/userid/testpartitiontapoutput.snappy.parquet";
Boolean InputFileHasHeaderRowFlag=true;
Scheme SourceScheme=new TextDelimited(new Fields("line"),true,"\n");
        Scheme TargetScheme=new TextDelimited(new Fields("line","loaddate"),false,"|");
Tap Source= new Hfs(SourceScheme,SourceFileName);
Tap Target= new Hfs(SourceScheme,TargetFileName);
Partition SourcePartition=new DelimitedPartition(new Fields("loaddate"),"_");
Source=new PartitionTap((Hfs)Source,SourcePartition);
FlowDef ConversionFlowDef = BuildCSVToParquetConversionFlowDef(Source, Target,"|");
Hadoop2MR1FlowConnector ConversionFlow = new Hadoop2MR1FlowConnector(properties);
ConversionFlow.connect(ConversionFlowDef).complete();

Source File name contains multiple files with name : 20160101_testinput.csv , 20160102_testinput.csv, 20160103_testinput.csv

The end goal is to get filename date into the data and then write to output file name in different format after intermediate processing in snappy parquet format.
The process works fine for single input and single output. 
Works fine for TargetPartition when i try to write files using single input file and adding the loaddate field in the data manually.

I am using Cascading 3.0.2. Has anyone come across this error during reads ? 

Andre Kelpe

unread,
Apr 7, 2016, 2:45:32 PM4/7/16
to cascading-user
This does not look like a job for partition tap. You can just use HFS
in this case and get access to the current filename in a function like
so:

String filename = (String) flowProcess.getProperty( "cascading.source.path" );

Then you parse the name and use it as a new field.

HTH

- Andre
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/e118fb8f-a937-47c5-b82f-0a0115e4ad37%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com

Kunal Lahiri

unread,
Apr 7, 2016, 4:35:54 PM4/7/16
to cascading-user
Thanks that worked. I had tried the same using flowProcess.getProperty("mapreduce.input.fileinputformat.inputdir"). But that gave the original base directory name. That's why i shifted to PartitionTap.

Andre Kelpe

unread,
Apr 7, 2016, 5:23:51 PM4/7/16
to cascading-user
Remember that cascading is based on the mapred API and not on the mapreduce API.

- Andre
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/ae28d659-75b5-4876-a230-38139d966924%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages