How to handle increasing data size on DataProc Cluster's datanode

591 views
Skip to first unread message

Chauhan B

unread,
Oct 16, 2019, 6:23:52 AM10/16/19
to Google Cloud Dataproc Discussions
Hi,

I am new to DataProc/GCP. For one of my automation I have created few pipelines in which:

1) On UI button click action, data is fetched from source system and gets stored on Google Sheets and Bigquery. 
2) For each UI action, I am maintaining jobid with metadata in Cloud SQL [MySQL].
3) There is a cloud composer job running after every 10 mins interval which is reading new jobids from Cloud SQL and processing respective dataset [present in Google Sheet and Bigquery] using pyspark jobs running on DataProc cluster and generating final output.

This whole flow is working properly but now I realized that DataProc cluster created on GCP actually copies the data [from Sheets and Bigquery] to underlying HDFS cluster and then runs the pyspark job. This leads to increase in disk usage and after some point in time pyspark job starts failing as it doesn't find space to copy the data to HDFS.

My Question is:
1) Does Dataproc/spark cluster permanently copies the data to respective datanodes?

2) Is there any way to schedule any data deletion policy after completion of respective job? If yes then how could this be achieved? Do I need to write any shell script for this purpose or is there any data curation plugin available in dataproc for doing this?

3) What is the best way of using Dataproc cluster, if I don't want to re-create and delete the cluster after every job run. I want my cluster to run forever, is there any drawback using this approach?

Please note - I don't want to auto-scale my cluster when data size increases.

Any quick references would be appreciated.

Regards,
Bhupesh 

Karthik Palaniappan

unread,
Oct 17, 2019, 2:24:18 AM10/17/19
to Google Cloud Dataproc Discussions
Dataproc's connectors do *not* copy to HDFS. If you are using the older BQ connector, it will copy to GCS, but not to HDFS.

Can you share with us your code to read from BQ or Cloud SQL? E.g. `spark.read.bigquery("publicdata.samples.shakespeare")`. How are you determining that data is being copied to HDFS?

To answer your questions:
1) No, it does not, unless your application code is copying to HDFS. If you're seeing increased disk usage, it's likely ephemeral shuffle data that should be deleted once the job completes.
2) You could delete the data at the end of your job.
3) You can use workflow templates to automate creating a cluster, running job(s), and then deleting it: https://cloud.google.com/dataproc/docs/concepts/workflows/overview. You can alternatively use scheduled deletion to only delete clusters after a certain time or after they've been idle for a certain period of time: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scheduled-deletion. You can use long-running clusters, but the main drawback is cost. If you're processing relatively little data, consider using single node clusters. Otherwise, consider using autoscaling.

Chauhan B

unread,
Oct 17, 2019, 5:08:46 AM10/17/19
to Google Cloud Dataproc Discussions
Hi Karthik,

First of all thank you for your quick response.

First I tried using newHadoopApi for fetching data from Bigquery:
def extract_data_from_big_query(spark, bq_project, dataset, table):
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")

bq_conf = {
"mapred.bq.project.id": project,
"mapred.bq.gcs.bucket": bucket,
"mapred.bq.input.project.id": bq_project,
"mapred.bq.input.dataset.id": dataset,
"mapred.bq.input.table.id": table
}
bq_data_rdd = spark.sparkContext.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=bq_conf)

raw_data_rdd = bq_data_rdd.map(lambda x: x[1])
raw_data_df = spark.read.json(raw_data_rdd)


return raw_data_df

Later when I was suppose to run my data read with where clause I moved it to:

def extract_data_from_big_query(spark, bq_project, dataset, table, column_selected, job_id):

    query_count = ('SELECT COUNT(*) FROM [{project}.{dataset_id}.{table_id}] where job_id = "{jobid}"'.format(
project = bq_project, dataset_id = dataset, table_id = table, jobid = job_id))

subprocess_result = subprocess.check_output([
'bq', '-q', '--headless','--format', 'json', 'query', query_count])
str_subprocess_result =
str(subprocess_result)
count_result = str_subprocess_result.split(
":")[1].split('"')[1]

query = (
'SELECT {columns} FROM [{project}.{dataset_id}.{table_id}] where job_id = "{jobid}"'.format(
columns = column_selected, project = bq_project, dataset_id = dataset, table_id = table, jobid = job_id))

result = subprocess.check_output([
'bq', '-q', '--headless','--format', 'json', 'query', '-n', count_result, query])
results = json.loads(result)
raw_data_df = spark.createDataFrame(results)

return raw_data_df

I didn't try spark.read.bigquery() considering that this is in its beta release, Do you suggest that I should go ahead and use 
spark.read.bigquery("publicdata:samples.shakespeare")
  .select("word")
  .where("word = 'Hamlet' or word = 'Claudius'")
  .collect()

while working on pyspark as well?

I would also like to know the difference between these three approaches.

* I have read somewhere that dataproc copies data to hdfs cluster from bigquery before processing the data.

If that is not the case then I am still wondering that why my spark jobs are resulting in disk usage? If this is the shuffle data then do I need to take any action for deleting this data, I consider that this should get deleted by itself once Spark job completes its execution.

Regards,
Bhupesh

Karthik Palaniappan

unread,
Oct 17, 2019, 12:32:01 PM10/17/19
to Google Cloud Dataproc Discussions
Yeah, go ahead and use the new connector -- that will simplify your code quite a bit, and will avoid copying to GCS. The old connector exports data to GCS: https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files, while the new connector reads directly from Bigquery's Storage API: https://cloud.google.com/bigquery/docs/reference/storage/. The new connector is also based on Dataframes / Data sources, so is far easier to work with.

Out of curiosity, how much data are you reading from BQ, and how much disk space is there in the cluster? I suspect .collect() causes a shuffle (all data written to disk) to be transferred to the driver. So you need more disk space in the cluster than your dataset. Also note that PD performance scales with how much disk space you buy: https://cloud.google.com/compute/docs/disks/performance.

Chauhan B

unread,
Nov 4, 2019, 3:41:11 AM11/4/19
to Google Cloud Dataproc Discussions
Thanks for your response so far Karthik.

Sorry for my delayed response. I got occupied with something and hence could find time to look into this.
Let me come back to you in case of any issue.

Regards,
Bhupesh

Chauhan B

unread,
Nov 7, 2019, 11:36:35 PM11/7/19
to Google Cloud Dataproc Discussions
Hi Karthik,

As suggested by you, I tried using given statements with my DataProc cluster having Spark2.4.4, but I am getting errors while using it. Am I using right Spark version? Which dataproc version supports this feature?

>>> spark.read.format('bigquery').option('table', '<my_dataset>.<my_table>').select('my_col_name').where("Job_ID = '5996'").show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'select'

>>> spark.read.bigquery('<mydomain>.com:<my_project>').option('table', 
 '<my_dataset>.<my_table>').select('my_col_name').where("Job_ID = '5996'").show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'DataFrameReader' object has no attribute 'bigquery'

It is working fine when I am using other two methods mentioned by me but not working with this. Your quick response will be appreciated.

Regards,
Bhupesh

Karthik Palaniappan

unread,
Nov 8, 2019, 1:12:26 PM11/8/19
to Google Cloud Dataproc Discussions
I think you need to call load() after option(...). Spark's documentation: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#manually-specifying-options

There are examples of using the BQ connector here, as well as how to install it via --jars or pom.xml: https://github.com/GoogleCloudPlatform/spark-bigquery-connector.
Reply all
Reply to author
Forward
0 new messages