HashJoin throws ParquetDecodingException with input as ParquetTupleScheme

65 views
Skip to first unread message

ANIKET MORE

unread,
Feb 15, 2016, 4:26:43 AM2/15/16
to cascading-user
Hi,

Iam facing problem while using HashJoin with input using ParquetTupleScheme. I have two source taps of which one is using TextDelimited scheme and the other source tap is using ParquetTupleScheme. I am performing a HashJoin and writing the data as Delimited file. The program runs successfully on local mode but when i tried to run it on cluster, it gives following error :

parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://Hostname:8020/user/username/testData/lookup-file.parquet
        at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:211)
        at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:144)
        at parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:91)
        at parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:42)
        at cascading.tap.hadoop.io.MultiRecordReaderIterator.makeReader(MultiRecordReaderIterator.java:123)
        at cascading.tap.hadoop.io.MultiRecordReaderIterator.getNextReader(MultiRecordReaderIterator.java:172)
        at cascading.tap.hadoop.io.MultiRecordReaderIterator.hasNext(MultiRecordReaderIterator.java:133)
        at cascading.tuple.TupleEntrySchemeIterator.<init>(TupleEntrySchemeIterator.java:94)
        at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:49)
        at cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator.<init>(HadoopTupleEntrySchemeIterator.java:44)
        at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:439)
        at cascading.tap.hadoop.Hfs.openForRead(Hfs.java:108)
        at cascading.flow.stream.element.SourceStage.map(SourceStage.java:82)
        at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
        at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:139)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.NullPointerException
        at parquet.hadoop.util.counters.mapred.MapRedCounterAdapter.increment(MapRedCounterAdapter.java:34)
        at parquet.hadoop.util.counters.BenchmarkCounter.incrementTotalBytes(BenchmarkCounter.java:75)
        at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:349)
        at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:114)
        at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:191)
        ... 21 more

Below are the UseCase:

    public static void main(String[] args) throws IOException {

        Configuration conf = new Configuration();

        String[] otherArgs;

        otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        String argsString = "";
        for (String arg : otherArgs) {
            argsString = argsString + " " + arg;
        }
        System.out.println("After processing arguments are:" + argsString);

        Properties properties = new Properties();
        properties.putAll(conf.getValByRegex(".*"));

        String OutputPath = "testData/BasicEx_Output";
        Class types1[] = { String.class, String.class, String.class };
        Fields f1 = new Fields("id1", "city1", "state");

        Tap source = new Hfs(new TextDelimited(f1, "|", "", types1, false), "main-txt-file.dat");
        Pipe pipe = new Pipe("ReadWrite");

        Scheme pScheme = new ParquetTupleScheme();
        Tap source2 = new Hfs(pScheme, "testData/lookup-file.parquet");
        Pipe pipe2 = new Pipe("ReadWrite2");

        Pipe tokenPipe = new HashJoin(pipe, new Fields("id1"), pipe2, new Fields("id"), new LeftJoin());

        Tap sink = new Hfs(new TextDelimited(f1, true, "|"), OutputPath, SinkMode.REPLACE);

        FlowDef flowDef1 = FlowDef.flowDef().addSource(pipe, source).addSource(pipe2, source2).addTailSink(tokenPipe,
                sink);
        new Hadoop2MR1FlowConnector(properties).connect(flowDef1).complete();

    }


I have attached the input files for the reference . Please help me in solving this issue.


lookup-file.parquet
main-txt-file.dat

Andre Kelpe

unread,
Feb 15, 2016, 7:16:32 AM2/15/16
to cascading-user
This looks like a bug caused by a wrong assumption in parquet. I fixed
a similar thing 2 years ago in parquet:
https://github.com/Parquet/parquet-mr/pull/388/ Can you check with the
upstream project? It looks like it is their problem and not a problem
in Cascading.

- André
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/4af70450-d5f6-4186-bb9e-8b9755ed7bb3%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com
Reply all
Reply to author
Forward
0 new messages