Cascading deletes some temporary files which MR job is trying to access in its mapper resulting in stale file handle error

24 views
Skip to first unread message

Saravanabavagugan

unread,
Feb 12, 2020, 9:14:08 PM2/12/20
to cascading-user
We are using cascading framework to perform ETL on large CSV files and writing the output using ParquetTupleScheme. Hadoop18TapUtil is attempting cleanUpJob before the mapper finishes. I doubt that the cleanTempPath is being called when the flow is still executing (HadoopUtil.isInflow( conf )) . Any help in this regard is highly appreciated. 


 public static void cleanupJob( Configuration conf ) throws IOException
    {
    if( HadoopUtil.isInflow( conf ) )
      return;

    Path outputPath = FileOutputFormat.getOutputPath( asJobConfInstance( conf ) );

    cleanTempPath( conf, outputPath );
    }


Stacktrace:


Graph.cleanup(StreamGraph.java:190)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:167)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:348)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2020-02-11 21:50:51,213 ERROR [main] cascading.tap.partition.BasePartitionTap: exception while closing TupleEntryCollector business_id=886/period_id=506875536/trans_id=7388154
cascading.tap.TapException: exception closing: business_id=886/period_id=506875536/trans_id=7388154/part-00000-00000
at cascading.tap.hadoop.io.TapOutputCollector.close(TapOutputCollector.java:172)
at cascading.tuple.TupleEntrySchemeCollector.close(TupleEntrySchemeCollector.java:197)
at cascading.tap.partition.BasePartitionTap$PartitionCollector.closeCollector(BasePartitionTap.java:203)
at cascading.tap.partition.BasePartitionTap$PartitionCollector.close(BasePartitionTap.java:188)
at cascading.flow.stream.element.SinkStage.cleanup(SinkStage.java:128)
at cascading.flow.stream.graph.StreamGraph.cleanup(StreamGraph.java:190)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:167)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:348)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1669)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: 68723.475.4017924 /mdsAdvsrch/xsearch/vw_exp_orders/_temporary/_attempt_1572334265176_59006_m_000000_0/business_id=886/period_id=506875536/trans_id=7388154/part-00000-00000-m-00000.parquet (Stale file handle)
at com.mapr.fs.Inode.throwIfFailed(Inode.java:390)
at com.mapr.fs.Inode.flushPages(Inode.java:505)
at com.mapr.fs.Inode.releaseDirty(Inode.java:583)
at com.mapr.fs.MapRFsOutStream.dropCurrentPage(MapRFsOutStream.java:73)
at com.mapr.fs.MapRFsOutStream.write(MapRFsOutStream.java:85)
at com.mapr.fs.MapRFsDataOutputStream.write(MapRFsDataOutputStream.java:39)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.write(HadoopPositionOutputStream.java:45)
at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
at org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
at org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat$RecordWriterWrapper.close(DeprecatedParquetOutputFormat.java:102)
at cascading.tap.hadoop.io.TapOutputCollector.close(TapOutputCollector.java:156)



Chris K Wensel

unread,
Feb 13, 2020, 1:34:11 AM2/13/20
to cascadi...@googlegroups.com
what version of Cascading are you using?

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/cf5018d4-2528-4638-9bd4-d262c555ab92%40googlegroups.com.

Chris K Wensel



Saravanabavagugan Vengadasundaram

unread,
Feb 13, 2020, 7:18:28 AM2/13/20
to cascadi...@googlegroups.com
3.1 Version.

We noticed that it happens when we have parallel threads running cascading jobs writing to the same base path. We have partitioning enabled so parallel MR jobs will actually write the output to different directories. However cascading creating a "_temporary" directory in the output sink path and deleting them at the start of the flow causes multiple flows to collide with each other.  



--
regards,
Saravana

B403

Chris K Wensel

unread,
Feb 13, 2020, 10:57:41 PM2/13/20
to cascadi...@googlegroups.com
And when you say you have parallel threads writing, this threading is happening within the task on the cluster?

because of the nature of the PartitionTap (hacking around MR’s inability to write more than one file while retaining support of speculative exec), I don’t think this can be supported. 

it definitely wasn’t something that would be intentionally supported.

That said, you could try Tez and see if things behave differently.

Or enable write direct (bypass _temp folders). you will need to poke in the src to find the flag.

ckw

Saravanabavagugan

unread,
Feb 19, 2020, 12:23:54 PM2/19/20
to cascading-user
Hi Chris,

Thanks for your inputs. 

Threading is not happening within the task. We use multithreading to create and run many MR jobs in parallel. All those jobs will have the same output path but due to partitioning, the real output dir will end up being the partition directories. But the temp files gets created in the base output path which is causing conflict between multiple threads. 

I was looking for a way to supply the temp dir location to cascading but thats something the code does not seem to provide as evident from Hadoop18TapUtil class which I mentioned. 

Please let me know if you find the flag to enable direct write and bypass the temp folders. That would be really helpful. Thanks once again. 
ckw

To unsubscribe from this group and stop receiving emails from it, send an email to cascadi...@googlegroups.com.

Chris K Wensel




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascadi...@googlegroups.com.


--
regards,
Saravana

B403

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascadi...@googlegroups.com.

Chris K Wensel



Chris K Wensel

unread,
Feb 19, 2020, 11:06:31 PM2/19/20
to cascadi...@googlegroups.com
Threading is not happening within the task. We use multithreading to create and run many MR jobs in parallel. All those jobs will have the same output path but due to partitioning, the real output dir will end up being the partition directories. But the temp files gets created in the base output path which is causing conflict between multiple threads. 



I was looking for a way to supply the temp dir location to cascading but thats something the code does not seem to provide as evident from Hadoop18TapUtil class which I mentioned. 

Please let me know if you find the flag to enable direct write and bypass the temp folders. That would be really helpful. Thanks once again. 

To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/08634d7e-a97e-463d-b938-45c7fb3c4335%40googlegroups.com.

Chris K Wensel



Reply all
Reply to author
Forward
0 new messages