We are using cascading framework to perform ETL on large CSV files and writing the output using ParquetTupleScheme. Hadoop18TapUtil is attempting cleanUpJob before the mapper finishes. I doubt that the cleanTempPath is being called when the flow is still executing (HadoopUtil.isInflow( conf )) . Any help in this regard is highly appreciated.
public static void cleanupJob( Configuration conf ) throws IOException
{
if( HadoopUtil.isInflow( conf ) )
return;
Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );
cleanTempPath( conf, outputPath );
}
Stacktrace:
Graph.cleanup(StreamGraph.java:190)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:167)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:348)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2020-02-11 21:50:51,213 ERROR [main] cascading.tap.partition.BasePartitionTap: exception while closing TupleEntryCollector business_id=886/period_id=506875536/trans_id=7388154
cascading.tap.TapException: exception closing: business_id=886/period_id=506875536/trans_id=7388154/part-00000-00000
at cascading.tap.hadoop.io.TapOutputCollector.close(TapOutputCollector.java:172)
at cascading.tuple.TupleEntrySchemeCollector.close(TupleEntrySchemeCollector.java:197)
at cascading.tap.partition.BasePartitionTap$PartitionCollector.closeCollector(BasePartitionTap.java:203)
at cascading.tap.partition.BasePartitionTap$PartitionCollector.close(BasePartitionTap.java:188)
at cascading.flow.stream.element.SinkStage.cleanup(SinkStage.java:128)
at cascading.flow.stream.graph.StreamGraph.cleanup(StreamGraph.java:190)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:167)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:348)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: 68723.475.4017924 /mdsAdvsrch/xsearch/vw_exp_orders/_temporary/_attempt_1572334265176_59006_m_000000_0/business_id=886/period_id=506875536/trans_id=7388154/part-00000-00000-m-00000.parquet (Stale file handle)
at com.mapr.fs.Inode.throwIfFailed(Inode.java:390)
at com.mapr.fs.Inode.flushPages(Inode.java:505)
at com.mapr.fs.Inode.releaseDirty(Inode.java:583)
at com.mapr.fs.MapRFsOutStream.dropCurrentPage(MapRFsOutStream.java:73)
at com.mapr.fs.MapRFsOutStream.write(MapRFsOutStream.java:85)
at com.mapr.fs.MapRFsDataOutputStream.write(MapRFsDataOutputStream.java:39)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.write(HadoopPositionOutputStream.java:45)
at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
at org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.close(DeprecatedParquetOutputFormat.java:102)
at cascading.tap.hadoop.io.TapOutputCollector.close(TapOutputCollector.java:156)