Getting error using PartitionTap and CombineInput

149 views
Skip to first unread message

Patrick Duin

unread,
Mar 6, 2015, 9:51:41 AM3/6/15
to cascadi...@googlegroups.com
Hi,

We ran into an issue working with PartitionTap and using CombineInput.

We get the following exception:
cascading.tap.TapException: unable to parse partition given parent: file:/tmp/cascadingtest/inputBasePath and child: null
    at cascading.tap.partition.PartitionTupleEntryIterator.<init>(PartitionTupleEntryIterator.java:53)
    at cascading.tap.partition.BasePartitionTap$PartitionIterator.createPartitionEntryIterator(BasePartitionTap.java:90)
    at cascading.tap.partition.BasePartitionTap$PartitionIterator.<init>(BasePartitionTap.java:73)
    at cascading.tap.partition.BasePartitionTap.openForRead(BasePartitionTap.java:343)


I've created a testcase to illustrate the problem:

  @Test
  public void partitionTapCombineInputFormat() throws Exception {
    TextDelimited inputScheme = new TextDelimited(new Fields("field1"));
    TextDelimited outputScheme = new TextDelimited(new Fields("field1", "partitionField"));
    Partition partition = new DelimitedPartition(new Fields("partitionField"));

    // create a file in /tmp/cascadingtest/inputBasePath/X/part
    File tempFolder = new File("/tmp/cascadingtest/");
    File inputPath = new File(tempFolder, "inputBasePath");
    File partitionX = new File(inputPath, "X");
    partitionX.mkdirs();
    Files.write(Paths.get(partitionX.getAbsolutePath(), "part"), Collections.singleton("valueX1"),
        StandardCharsets.UTF_8);

    Map<Object, Object> properties = new HashMap<Object, Object>();
    HfsProps.setUseCombinedInput(properties, true); // Test works when set to false
    HfsProps.setCombinedInputMaxSize(properties, 1024L); // irrelevant

    Hfs partitionHfs = new Hfs(inputScheme, inputPath.getAbsolutePath());
    PartitionTap sourceTap = new PartitionTap(partitionHfs, partition);

    FlowDef flowDef = FlowDef.flowDef();
    Hfs sinkTap = new Hfs(outputScheme, tempFolder.getAbsolutePath() + "/output");
    Pipe pipe = new Pipe("pipe");
    flowDef.addSource(pipe, sourceTap);
    flowDef.addTailSink(pipe, sinkTap);
    Flow<?> flow = new Hadoop2MR1FlowConnector(properties).connect(flowDef);
    flow.complete();
  }

 
We've experienced the problem with cascading-2.5.5, but I confirmed it still exists in 2.6.3.
Are we running into a bug or are we doing something wrong?

Kind regards,
 Patrick

Chris K Wensel

unread,
Mar 6, 2015, 12:15:18 PM3/6/15
to cascadi...@googlegroups.com
Unfortunately CombinedInputFormat with PartitionTaps is unsupported. 

Seems this is underdocumented, and we could do with some improved error messages around this as well.

Unless we can figure a way around it, PartitionTap relies on knowing the path being read (tuple values are parsed from the path name). CombinedInputFormat loses them.

I’ll open an issue to improve matters in 2.7, and see if later versions of Hadoop improve visibility to the input path.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/d93dd804-87f4-4bf0-a2d4-4b025f8993ba%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Patrick Duin

unread,
Mar 6, 2015, 2:18:33 PM3/6/15
to cascadi...@googlegroups.com
Thanks for the quick reply :)
That's unfortunate indeed, I can imagine it will be a bit tricky.

 Patrick

Patrick Duin

unread,
Mar 12, 2015, 7:36:42 AM3/12/15
to cascadi...@googlegroups.com
Hi,

I've been debugging this a bit as it is a feature we would like to use.
I might be missing something but I got my test working when I make a very small change to cascading.tap.partition.BasePartitionTap
I'm using cascading-2.7-wip branch
On line 70 I've changed
      if( input != null )
to
   if( input != null && getCurrentIdentifier(flowProcess) != null )

That's the only change. It forces the PartitionIterator to use the childIdentifiers and deeper in the code it will create RecordReaders for the correct files.
Now I am no expert on this code and I don't entirely understand it all. But I've been happily testing everything on our job and everything seems ok either setting combine input or not.
The example test I wrote has just one partition, I've tried adding two partitions an I see it creates two files when combineInput=false and one file when combineInput=true. This indicates that one mapper is used when combining which is what I would expect.

I'm running a gradle test in cascading-platform to make sure I didn't break anything (seems ok for now takes a long time and tests are still running as I write).
I've been running the job on our cluster as well and I see the combineInput flags being picked up nicely and data being read with a reduced number of mappers so all seems good there as well. Need to still compare my output though.

Could this be of any help in solving the issue?

Kind regards,
 Patrick

Chris K Wensel

unread,
Mar 12, 2015, 12:30:33 PM3/12/15
to cascadi...@googlegroups.com
What is happening in this case is that every file below the parent tap will be opened, in every mapper. Not just the files that represent the split.

That code only exists so a user can read a PartitionTap client side, as input is null if used client side. Cluster side, the input (the actual RecordReader) is created by Hadoop and passed down. it is tied to the split.

A split is usually part of an input file. Except when its a Combined split, where it is the whole of multiple files. We need the filename we are reading to extract the partition values. 

It would be up to the CombinedInputFormat to pass that information up the stack.

ckw

-- 
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Patrick Duin

unread,
Mar 12, 2015, 1:19:13 PM3/12/15
to cascadi...@googlegroups.com
Thanks for the explanation, I've got more to learn! :)

Patrick Duin

unread,
Mar 17, 2015, 10:27:55 AM3/17/15
to cascadi...@googlegroups.com

Attempt nr 2.
I've created a pull request this time as it is a bit more code:
https://github.com/cwensel/cascading/pull/35

I hope we are bit more in the right direction. We are running some tests with this at the moment, I'll report back if anything is not working as expected for us.

Cheers,
 Patrick

Chris K Wensel

unread,
Apr 1, 2015, 11:27:32 PM4/1/15
to cascadi...@googlegroups.com
this was committed. thanks!


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Reply all
Reply to author
Forward
0 new messages