Hi Karthik,
First of all thank you for your quick response.
First I tried using newHadoopApi for fetching data from Bigquery:
def extract_data_from_big_query(spark, bq_project, dataset, table):
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
bq_conf = {
"mapred.bq.project.id": project,
"mapred.bq.gcs.bucket": bucket,
"mapred.bq.input.project.id": bq_project,
"mapred.bq.input.dataset.id": dataset,
"mapred.bq.input.table.id": table
}
bq_data_rdd = spark.sparkContext.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=bq_conf)
raw_data_rdd = bq_data_rdd.map(lambda x: x[1])
raw_data_df = spark.read.json(raw_data_rdd)
return raw_data_df
Later when I was suppose to run my data read with where clause I moved it to:
def extract_data_from_big_query(spark, bq_project, dataset, table, column_selected, job_id):
query_count = ('SELECT COUNT(*) FROM [{project}.{dataset_id}.{table_id}] where job_id = "{jobid}"'.format(
project = bq_project, dataset_id = dataset, table_id = table, jobid = job_id))
subprocess_result = subprocess.check_output(['bq', '-q', '--headless','--format', 'json', 'query', query_count])
str_subprocess_result = str(subprocess_result)
count_result = str_subprocess_result.split(":")[1].split('"')[1]
query = ('SELECT {columns} FROM [{project}.{dataset_id}.{table_id}] where job_id = "{jobid}"'.format(
columns = column_selected, project = bq_project, dataset_id = dataset, table_id = table, jobid = job_id))
result = subprocess.check_output(['bq', '-q', '--headless','--format', 'json', 'query', '-n', count_result, query])
results = json.loads(result)
raw_data_df = spark.createDataFrame(results)
return raw_data_df
I didn't try spark.read.bigquery() considering that this is in its beta release, Do you suggest that I should go ahead and use
spark.read.bigquery("publicdata:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
while working on pyspark as well?
I would also like to know the difference between these three approaches.
* I have read somewhere that dataproc copies data to hdfs cluster from bigquery before processing the data.
If that is not the case then I am still wondering that why my spark jobs are resulting in disk usage? If this is the shuffle data then do I need to take any action for deleting this data, I consider that this should get deleted by itself once Spark job completes its execution.
Regards,
Bhupesh