Hi,
Iam facing problem while using HashJoin with input using ParquetTupleScheme. I have two source taps of which one is using TextDelimited scheme and the other source tap is using ParquetTupleScheme. I am performing a HashJoin and writing the data as Delimited file. The program runs successfully on local mode but when i tried to run it on cluster, it gives following error :
parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://Hostname:8020/user/username/testData/lookup-file.parquet
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:211)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:144)
at parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:91)
at parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:42)
at cascading.tap.hadoop.io.MultiRecordReaderIterator.makeReader(MultiRecordReaderIterator.java:123)
at cascading.tap.hadoop.io.MultiRecordReaderIterator.getNextReader(MultiRecordReaderIterator.java:172)
at cascading.tap.hadoop.io.MultiRecordReaderIterator.hasNext(MultiRecordReaderIterator.java:133)
at cascading.tuple.TupleEntrySchemeIterator.<init>(TupleEntrySchemeIterator.java:94)
at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:49)
at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:44)
at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:439)
at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:108)
at cascading.flow.stream.element.SourceStage.map(SourceStage.java:82)
at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:139)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.NullPointerException
at parquet.hadoop.util.counters.mapred.MapRedCounterAdapter.increment(MapRedCounterAdapter.java:34)
at parquet.hadoop.util.counters.BenchmarkCounter.incrementTotalBytes(BenchmarkCounter.java:75)
at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:114)
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:191)
... 21 more
Below are the UseCase:
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs;
otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
String argsString = "";
for (String arg : otherArgs) {
argsString = argsString + " " + arg;
}
System.out.println("After processing arguments are:" + argsString);
Properties properties = new Properties();
properties.putAll(conf.getValByRegex(".*"));
String OutputPath = "testData/BasicEx_Output";
Class types1[] = { String.class, String.class, String.class };
Fields f1 = new Fields("id1", "city1", "state");
Tap source = new Hfs(new TextDelimited(f1, "|", "", types1, false), "main-txt-file.dat");
Pipe pipe = new Pipe("ReadWrite");
Scheme pScheme = new ParquetTupleScheme();
Tap source2 = new Hfs(pScheme, "testData/lookup-file.parquet");
Pipe pipe2 = new Pipe("ReadWrite2");
Pipe tokenPipe = new HashJoin(pipe, new Fields("id1"), pipe2, new Fields("id"), new LeftJoin());
Tap sink = new Hfs(new TextDelimited(f1, true, "|"), OutputPath, SinkMode.REPLACE);
FlowDef flowDef1 = FlowDef.flowDef().addSource(pipe, source).addSource(pipe2, source2).addTailSink(tokenPipe,
sink);
new Hadoop2MR1FlowConnector(properties).connect(flowDef1).complete();
}
I have attached the input files for the reference . Please help me in solving this issue.