parquet write to gs:// slow

2,806 views
Skip to first unread message

cs...@broadinstitute.org

unread,
Jul 22, 2016, 5:48:45 PM7/22/16
to Google Cloud Dataproc Discussions
My setup: 200-core cluster performing a large parquet write (13K 100+/-MB partitions) to Google Storage.  After all the partitions complete, it takes another ~10m (haven't timed carefully) for the write to finish.  I improved it some (total 20m -> 15m) by using the parquet direct committer:

  --properties spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter

Is there a way to improve this further?  10m is a significant percentage of my job's runtime.

I'm guessing it is a consistency synchronization, maybe for the bucket list objects operation?  If I'm willing to record the list of parquet part files and list them explicitly on read, then it shouldn't be necessary to wait for list objects consistency.  Is it possible to disable it and just wait for the individual part writes to succeed?

Best,
Cotton

Dennis Huo

unread,
Jul 22, 2016, 6:56:09 PM7/22/16
to Google Cloud Dataproc Discussions
Indeed, as long as you're using the DirectParquetOutputCommitter it should be safe to set the config: "fs.gs.metadata.cache.enable=false"; if you want to just set it at job-submission time, you can use the "spark.hadoop.*" hack:

gcloud dataproc jobs submit spark ... --properties spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter,spark.hadoop.fs.gs.metadata.cache.enable=false

Note that this would be unsafe to use if you don't use a "Direct*OutputCommitter", since even individual task-commits with the "_temporary/" directory rely on FileSystem.listStatus; you can also just set:

fs.gs.metadata.cache.type=IN_MEMORY

or as a spark config:

--properties spark.hadoop.fs.gs.metadata.cache.type=IN_MEMORY

and then the consistency is only enforced in a same-process data structure and lives only for the duration of the tasks performing the commits.

That said, 10 minutes sounds like for just 13K files if the slowness were just related to the consistency enforcement, so let me know if it's still slow after you disable the cache, and I can try to repro and see what's going on.

Dennis Huo

unread,
Jul 22, 2016, 6:57:35 PM7/22/16
to Google Cloud Dataproc Discussions
*10 minutes sounds high for just 13K files

cs...@broadinstitute.org

unread,
Jul 25, 2016, 11:36:54 AM7/25/16
to Google Cloud Dataproc Discussions
Dennnis, thanks for the quick reply!

spark.hadoop.fs.gs.metadata.cache.enable=false did not speed things up.

The job is essentially a parquet copy (read/write) of 13K partitions and ~500GB where each step does some data unpacking/packing, but not real computation.

I ran three experiments: default, direct, and direct+cache=false.  I give the total time of the read/write (as measured from within the application) and the "sync time": the time between the progress bar showing the last partition complete and the job finishing (hand-timed, +/- 20s maybe).

default:

total: 20m37s
sync: 14m42s

direct:

options: --properties spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter
total: 15m5s
sync: 10m47s

direct+cache=false

options: --properties spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter,spark.hadoop.fs.gs.metadata.cache.enable=false
total: 15m3s
sync: 11m8s

Let me know what I can do to help you reproduce this.

Best,
Cotton

Srinivas Rishindra

unread,
Jun 20, 2017, 3:10:17 AM6/20/17
to Google Cloud Dataproc Discussions
HI,

DirectParquetOutputCommitter is disabled in spark-2.0. 
spark creates a temperory directory for committing files and then renames them. Since rename actually moves the data instead of a metadata rewrite, it is a very costly operation.
Netflix has come up with a solution for the same problem in s3. https://github.com/rdblue/s3committer
Is there anything like this for Google cloud storage.


Best Regards
Rishi

Patrick Clay

unread,
Jun 20, 2017, 2:36:18 PM6/20/17
to Google Cloud Dataproc Discussions
Rename (actually Copy) in Google Cloud Storage is actually a metadata rewrite (unless you are changing encryption or storage class in which case it does require a moving data on the backend see https://cloud.google.com/storage/docs/json_api/v1/objects/copy)

In my experience committing Parquet to Google Cloud Storage in Spark 2+ with the default ParquetOutputCommiter and the v2 FileOutputCommitter enabled is quite fast.

Anbu Cheeralan

unread,
Jul 14, 2017, 1:54:09 PM7/14/17
to Google Cloud Dataproc Discussions
Interesting. I have always seen "mv" actually doing copy and delete in google storage.
I think with out the  _SUCESS marker, V2 FileOutputCommiter is susceptible to executor loss.

Saumya Suhagiya

unread,
Jun 8, 2021, 9:17:21 AM6/8/21
to Google Cloud Dataproc Discussions
Was there any conclusion here? 

I am facing similar issue.

Reply all
Reply to author
Forward
0 new messages