Dataproc slow using the BigQuery connector

812 views
Skip to first unread message

poiuytrez

unread,
Oct 27, 2015, 10:52:05 AM10/27/15
to Google Cloud Dataproc Discussions
Hello, 

I created a cluster of 5 workers. When I ran a modification of the bigquery connector example [1] as I no output table, the spark bigquery connector does not seems to be well configured. My table is approximately of 500 MB and 3,000,000 rows. I can see in my logs:
15/10/27 14:33:44 WARN com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Estimated max shards < desired num maps (2 < 20); clipping to 2.
15/10/27 14:33:44 INFO com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Computed '2' shards for sharded BigQuery export.

which seems to be sub optimal to use only 2 shards as I have more than 20 cores available. It takes more than 10 minutes to count all the rows. Is there a way to make the BigQuery connector smarter ? My goal is to run spark jobs on terabytes of data (currently stored in BQ).

Thank you for your help,
poiuytrez


The modified scala code:

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.mapred.BigQueryMapredInputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable

val projectId = "xxx"
val fullyQualifiedInputTableId = "xxx:yyy.zzz"
val jobName = "wordcount"

val conf = sc.hadoopConfiguration

// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)

// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
val systemBucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, systemBucket)

// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)

val fieldName = "word"

val tableData = sc.newAPIHadoopRDD(conf,
    classOf[GsonBigQueryInputFormat], classOf[LongWritable], classOf[JsonObject])
tableData.count()

James Malone

unread,
Oct 29, 2015, 3:32:14 PM10/29/15
to Google Cloud Dataproc Discussions
Hello!

This GitHub issue is probably relevant to your question here. The BigQuery connector assumes it is running in Hadoop MapReduce and is presently tuned for that use case. Additionally, there is not a good way for you to force the number of shards. In that thread, Dennis provides a summary for a current workaround:

TL;DR: If you have more executors than the computed number of BigQuery shards, and you know the work is heavyweight enough that you really want to do sub-shard splitting, just use Unsharded export and possibly adjust fs.gs.block.size if you need the sub-shard splits to be smaller than the default block size of 64MB.

Hope this helps!

James

poiuytrez

unread,
Oct 30, 2015, 5:14:10 AM10/30/15
to Google Cloud Dataproc Discussions
Thank you James.
Reply all
Reply to author
Forward
0 new messages