Writing to Taps outside of a flow

54 views
Skip to first unread message

Elliot West

unread,
Oct 13, 2014, 4:48:20 PM10/13/14
to cascadi...@googlegroups.com
Hi,

I have a slightly unusual use case where I'd like to be able to read from any Tap instance outside of the scope of a connected flow. I effectively want to utilise the tap/scheme machinery without the overhead of a full MR job. I can pretty much get this working like so (for Hadoop based taps - I have something similar for local taps):

Tap<JobConf, ?, ?> hadoopTap = (Tap<JobConf, ?, ?>) sourceTap;
JobConf conf = new JobConf();
FlowProcess<JobConf> flowProcess = new HadoopFlowProcess(conf);
hadoopTap.sourceConfInit(flowProcess, conf);
TupleEntryIterator tuples = hadoopTap.openForRead(flowProcess);
while (tuples.hasNext()) {
  tuples.next();
}

This seems to work well for all Taps that I've tried except for the PartitionTap where I get some slightly different behaviour than if I were to connect up a tap to a sink with an HadoopFlowConnector. In my use case I have a number of 'hidden' metadata files with a '.' prefix that live in the base directory of the partitioned data. If I use a flow connector to read from the tap, these files are ignored - they are correctly discarded as candidates for partition paths. However, if I read from the Tap directly as described above the PartitionTap attempts to use these paths and fails like so:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/basePath/.hidden
  at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
  at cascading.tap.hadoop.io.MultiRecordReaderIterator.initialize(MultiRecordReaderIterator.java:102)
  at cascading.tap.hadoop.io.MultiRecordReaderIterator.<init>(MultiRecordReaderIterator.java:80)
  at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.makeIterator(HadoopTupleEntrySchemeIterator.java:57)
  at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:44)
  at cascading.tap.hadoop.PartitionTap.createTupleEntrySchemeIterator(PartitionTap.java:167)
  at cascading.tap.hadoop.PartitionTap.createTupleEntrySchemeIterator(PartitionTap.java:72)
  at cascading.tap.partition.BasePartitionTap$PartitionIterator.createPartitionEntryIterator(BasePartitionTap.java:88)
  at cascading.tap.partition.BasePartitionTap$PartitionIterator.<init>(BasePartitionTap.java:80)
  at cascading.tap.partition.BasePartitionTap.openForRead(BasePartitionTap.java:343)
  at cascading.tap.Tap.openForRead(Tap.java:270)

I have a horrible workaround that involves subclassing the PartitionTap's parent tap type and overriding Hfs#getChildIdentifiers(JobConf, int, boolean) and implementing a filter here that removes file constructs that are normally ignored by Hadoop (dot files, SUCCESS folders, _logs folders, etc.) However, this isn't workable for me in the long term because my use case demands that I may not ultimately have control over what Taps are passed into my method (within reason).

Can you suggest what extra steps I might need to take in my execution of the Tap mechanisms so that my PartitionTap read behaviour is consistent with that of invocation within a Hadoop job? I suspect that my problems arise from my use of Tap#openForRead(FlowProcess<Config>) instead of Tap#openForRead(FlowProcess<Config>, Input) but I'm uncertain as to how I can satisfy the input parameter in this instance.

Many thanks - Elliot.

Chris K Wensel

unread,
Oct 13, 2014, 10:46:37 PM10/13/14
to cascadi...@googlegroups.com
good catch. looks like at


the class 

org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter()

doesn't do a very good job.

its unfortunate that hiddenFileFilter is hidden in hadoop. will issue a 2.6 wip tomorrow with a fix.

  private static final PathFilter hiddenFileFilter = new PathFilter(){
      public boolean accept(Path p){
        String name = p.getName(); 
        return !name.startsWith("_") && !name.startsWith("."); 
      }
    }; 

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/CAC3gpCYhdkAdqWSs7vL%2Ba4POoi1pW35PdvP7yiPVRaUWXC1M8g%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


Chris K Wensel

unread,
Oct 14, 2014, 12:34:25 PM10/14/14
to cascadi...@googlegroups.com
2.6 wip 44 should be out in 3 hours assuming I didn't bork something.


ckw


For more options, visit https://groups.google.com/d/optout.

Elliot West

unread,
Oct 14, 2014, 2:41:48 PM10/14/14
to cascadi...@googlegroups.com
Thanks for the rapid fix!
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/70113038-30A6-4E8F-ADEA-BDA413C38568%40wensel.net.
Reply all
Reply to author
Forward
0 new messages