I'm trying to do a parquet schema merge in Spark.
mergedDF.repartition(DF("some_id")).write.partitionBy("some_id").parquet("gs://bucket/output/parquet/");
The job works fine as a DataProc Spark/YARN job but after some hours the job fails with multiple instances of this:
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:446)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException:
com.google.api.client.googleapis.json.GoogleJsonResponseException: 410
Gone
{
"code" : 503,
"errors" : [ {
"domain" : "global",
"message" : "Backend Error",
"reason" : "backendError"
} ],
"message" : "Backend Error"
}
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:432)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
at com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage$WritableByteChannelImpl.close(CacheSupplementedGoogleCloudStorage.java:68)
at java.nio.channels.Channels$1.close(Channels.java:178)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:126)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer$$anonfun$writeRows$4.apply$mcV$sp(WriterContainer.scala:422)
at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer$$anonfun$writeRows$4.apply(WriterContainer.scala:416)
at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer$$anonfun$writeRows$4.apply(WriterContainer.scala:416)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:438)
... 8 more
Suppressed: java.lang.NullPointerException
at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer$$anonfun$writeRows$5.apply$mcV$sp(WriterContainer.scala:440)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1357)
... 9 more
Some stack overflow responses suggest that these are temporary GCS issues, some suggest DataFlow sharding (but I'm using Dataproc). How can we avoid this? Is it a matter of increasing failure attempts or can we change
the Spark code in some way to prevent this?