Random errors while generating parquet files from pyspark

1,970 views
Skip to first unread message

cvisi...@xebia.com

unread,
Mar 1, 2016, 5:24:27 PM3/1/16
to Google Cloud Dataproc Discussions
Hi All,

I seem to get errors randomly when generating parquet files from PySpark.

They work most of the time but they do crash every now and then with:
Object gs://my/bucket/parquet/2016-03-01/foo.parquet/year_month=2014-11/part-r-00055-a6f9eae5-04d1-42ea-be71-2d27f890403d.gz.parquet already exists.
(full stacktrace below)

Rerunning usually fixes it but sometimes i have to rerun it twice to get it to work.

Any pointers on getting my parqet file generation to reliably work the first time around?

Constantijn Visinescu

P.S.
i found this link:
https://forums.databricks.com/questions/1489/why-do-i-get-javaioioexception-file-already-exists.html
it seems similar however i generate my parquet files to new/empty directories, so the problem might not be the same

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) ... 27 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: java.io.IOException: Object gs://my/bucket/parquet/2016-03-01/foo.parquet/year_month=2014-11/part-r-00055-a6f9eae5-04d1-42ea-be71-2d27f890403d.gz.parquet already exists. at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1628) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:369) at com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage.create(CacheSupplementedGoogleCloudStorage.java:98) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:254) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:238) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:79) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:903) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:176) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:160) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:289) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356) ... 8 more

Angus Davis

unread,
Mar 1, 2016, 7:05:48 PM3/1/16
to Google Cloud Dataproc Discussions
Hi,

Looking at the file path, it looks like you might be making use of the DirectParquetOutputCommitter. The DirectParquetOutputCommitter generally improves performance against object stores (GCS, S3, etc), but doesn't play very well with task restarts / failures with retry. If it's the case that you are using this output committer then what may be happening is that one or more tasks is failing and the failed task is then retried, but when retrying a partial output file already exists for that task and the IOException is thrown from GoogleCloudStorageImpl. There's a terse jira here that appears to cover this: https://issues.apache.org/jira/browse/SPARK-8413

As to why you aren't seeing a nicer error message alluding to the problem: from perusing spark source It looks like spark sql / the ParquetWriter are expecting a FileAlreadyExists exception in this case, but the GoogleHadoopFileSystem is instead throwing a generic IOException. 

If you are using the DirectParquetOutputCommitter, you may want to try using the standard ParquetOutputCommitter and see how it impacts performance of saving your RDD / DataFrame (it can be a significant degradation), but at the same time if that degradation is less than the time it takes to re-run the job, it may be worthwhile as a short term fix. Further, the error message that should be returned reads "The output file already exists but this could be due to a failure from an earlier attempt. Look through the earlier logs or stage page for the first error." It may be beneficial to look for any failed attempts and logs from them.

Angus
Reply all
Reply to author
Forward
0 new messages