Patrick Duin
unread,Mar 6, 2015, 9:51:41 AM3/6/15Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to cascadi...@googlegroups.com
Hi,
We ran into an issue working with PartitionTap and using CombineInput.
We get the following exception:
cascading.tap.TapException: unable to parse partition given parent: file:/tmp/cascadingtest/inputBasePath and child: null
at cascading.tap.partition.PartitionTupleEntryIterator.<init>(PartitionTupleEntryIterator.java:53)
at cascading.tap.partition.BasePartitionTap$PartitionIterator.createPartitionEntryIterator(BasePartitionTap.java:90)
at cascading.tap.partition.BasePartitionTap$PartitionIterator.<init>(BasePartitionTap.java:73)
at cascading.tap.partition.BasePartitionTap.openForRead(BasePartitionTap.java:343)
I've created a testcase to illustrate the problem:
@Test
public void partitionTapCombineInputFormat() throws Exception {
TextDelimited inputScheme = new TextDelimited(new Fields("field1"));
TextDelimited outputScheme = new TextDelimited(new Fields("field1", "partitionField"));
Partition partition = new DelimitedPartition(new Fields("partitionField"));
// create a file in /tmp/cascadingtest/inputBasePath/X/part
File tempFolder = new File("/tmp/cascadingtest/");
File inputPath = new File(tempFolder, "inputBasePath");
File partitionX = new File(inputPath, "X");
partitionX.mkdirs();
Files.write(Paths.get(partitionX.getAbsolutePath(), "part"), Collections.singleton("valueX1"),
StandardCharsets.UTF_8);
Map<Object, Object> properties = new HashMap<Object, Object>();
HfsProps.setUseCombinedInput(properties, true); // Test works when set to false
HfsProps.setCombinedInputMaxSize(properties, 1024L); // irrelevant
Hfs partitionHfs = new Hfs(inputScheme, inputPath.getAbsolutePath());
PartitionTap sourceTap = new PartitionTap(partitionHfs, partition);
FlowDef flowDef = FlowDef.flowDef();
Hfs sinkTap = new Hfs(outputScheme, tempFolder.getAbsolutePath() + "/output");
Pipe pipe = new Pipe("pipe");
flowDef.addSource(pipe, sourceTap);
flowDef.addTailSink(pipe, sinkTap);
Flow<?> flow = new Hadoop2MR1FlowConnector(properties).connect(flowDef);
flow.complete();
}
We've experienced the problem with cascading-2.5.5, but I confirmed it still exists in 2.6.3.
Are we running into a bug or are we doing something wrong?
Kind regards,
Patrick