Hi,
I have a slightly unusual use case where I'd like to be able to read from any Tap instance outside of the scope of a connected flow. I effectively want to utilise the tap/scheme machinery without the overhead of a full MR job. I can pretty much get this working like so (for Hadoop based taps - I have something similar for local taps):
Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) sourceTap;
JobConf conf = new JobConf();
FlowProcess<JobConf> flowProcess = new HadoopFlowProcess(conf);
hadoopTap.sourceConfInit(flowProcess, conf);
TupleEntryIterator tuples = hadoopTap.openForRead(flowProcess);
while (tuples.hasNext()) {
tuples.next();
}
This seems to work well for all Taps that I've tried except for the PartitionTap where I get some slightly different behaviour than if I were to connect up a tap to a sink with an HadoopFlowConnector. In my use case I have a number of 'hidden' metadata files with a '.' prefix that live in the base directory of the partitioned data. If I use a flow connector to read from the tap, these files are ignored - they are correctly discarded as candidates for partition paths. However, if I read from the Tap directly as described above the PartitionTap attempts to use these paths and fails like so:
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/basePath/.hidden
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at cascading.tap.hadoop.io.MultiRecordReaderIterator.initialize(MultiRecordReaderIterator.java:102)
at cascading.tap.hadoop.io.MultiRecordReaderIterator.<init>(MultiRecordReaderIterator.java:80)
at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.makeIterator(HadoopTupleEntrySchemeIterator.java:57)
at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:44)
at cascading.tap.hadoop.PartitionTap.createTupleEntrySchemeIterator(PartitionTap.java:167)
at cascading.tap.hadoop.PartitionTap.createTupleEntrySchemeIterator(PartitionTap.java:72)
at cascading.tap.partition.BasePartitionTap$PartitionIterator.createPartitionEntryIterator(BasePartitionTap.java:88)
at cascading.tap.partition.BasePartitionTap$PartitionIterator.<init>(BasePartitionTap.java:80)
at cascading.tap.partition.BasePartitionTap.openForRead(BasePartitionTap.java:343)
at cascading.tap.Tap.openForRead(Tap.java:270)
I have a horrible workaround that involves subclassing the PartitionTap's parent tap type and overriding
Hfs#getChildIdentifiers(JobConf, int, boolean) and implementing a filter here that removes file constructs that are normally ignored by Hadoop (dot files, SUCCESS folders, _logs folders, etc.) However, this isn't workable for me in the long term because my use case demands that I may not ultimately have control over what Taps are passed into my method (within reason).
Can you suggest what extra steps I might need to take in my execution of the Tap mechanisms so that my PartitionTap read behaviour is consistent with that of invocation within a Hadoop job? I suspect that my problems arise from my use of Tap#openForRead(FlowProcess<Config>) instead of Tap#openForRead(FlowProcess<Config>, Input) but I'm uncertain as to how I can satisfy the input parameter in this instance.