Help with Simple pyspark example on Dataproc

2,724 views
Skip to first unread message

Richard Holowczak

unread,
May 2, 2020, 7:24:50 AM5/2/20
to Google Cloud Dataproc Discussions
Folks:
I am attempting to follow a relatively simple tutorial (at least initially) using pyspark on Dataproc.
While written for AWS, I was hoping the pyspark code would run without issues on Dataproc.
I believe I do not need to do all of the initial parts of the tutorial since Dataproc already has everything installed and configured when I launch a Dataproc cluster.  I would like to start at the section titled: "Machine Learning with Spark"

I seem to be missing some key piece of information however with regards to how and where files are stored in HDFS from the perspective of the master node, vs. the cluster as a whole.

I launch a default Dataproc cluster, log in with SSH and run pyspark.  Then issue the following code:


from pyspark.sql import SQLContext

from pyspark import SparkFiles

sc.setLogLevel("ERROR")

url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"

sc.addFile(url)

sqlContext = SQLContext(sc)

SparkFiles.getRootDirectory()

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)




At this point I receive errors that the file does not exist:


>>> from pyspark.sql import SQLContext
>>> sc.setLogLevel("ERROR")
>>> from pyspark import SparkFiles
>>> sc.addFile(url)
>>> sqlContext = SQLContext(sc)
>>> SparkFiles.getRootDirectory()
u'/hadoop/spark/tmp/spark-d399fded-add3-419d-8132-cac56a242d87/userFiles-d3f2092d-519c-4466-ab40-0217466d2140'
>>> df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 441, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://cluster-de5c-m/hadoop/spark/tmp/spark-d399fded-add3-419d-8132-cac56a242d87/userFiles-d3f2092d-519c-4466-ab40-0217466d2140/adult_data.csv;'


If I preface the SparkFiles.get with the "file://" prefix I get errors from the workers.

>>> df = sqlContext.read.csv("file://"+SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
20/05/02 11:18:36 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 4, cluster-de5c-w-0.us-central1-a.c.handy-bonbon-142723.internal, executor 2): java.io.FileNotFoundException: File file:/hadoop/spark/tmp/spark-d399fded-add3-419d-8132-cac56a242d87/userFiles-d3f2092d-519c-4466-ab40-0217466d2140/adult_data.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:135)

... plus many hundreds more lines of errors.


I appreciate that there are advantages to storing files in Google Cloud Storage but i am just trying to follow the most basic example but using Dataproc.

Any suggestions on a preferred but simple way to use HDFS with pyspark?


Moses Sundheep

unread,
May 2, 2020, 8:01:05 AM5/2/20
to Google Cloud Dataproc Discussions
Can you prefix with hdfs://

Richard Holowczak

unread,
May 2, 2020, 8:04:39 AM5/2/20
to Google Cloud Dataproc Discussions
Hi thanks for the suggestion.
I am fairly certain hdfs://  is the default as seen from the error message:
pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://cluster-de5c-m/.....

However I will try it a little later today.

Richard Holowczak

unread,
May 2, 2020, 2:54:15 PM5/2/20
to Google Cloud Dataproc Discussions
Here is one more try with "hdfs://" in the read.csv call
Using Python version 2.7.17 (default, Nov  7 2019 10:07:09)
SparkSession available as 'spark'.
>>> from pyspark.sql import SQLContext
>>> from pyspark import SparkFiles
>>> sc.setLogLevel("ERROR")
>>> sc.addFile(url)
>>> sqlContext = SQLContext(sc)
>>> SparkFiles.getRootDirectory()
u'/hadoop/spark/tmp/spark-5f134470-758e-413c-9aee-9fc6814f50da/userFiles-b5415bba-4645-45de-abff-6c22e84d121f'
>>> df = sqlContext.read.csv("hdfs://"+SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 476, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://cluster-6ef9-m/hadoop/spark/tmp/spark-5f134470-758e-413c-9aee-9fc6814f50da/userFiles-b5415bba-4645-45de-abff-6c22e84d121f/adult_data.csv;'



I also attempted to first put the file to HDFS:

user@cluster-6ef9-m:~$ wget https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv

user@cluster-6ef9-m:~$ hdfs dfs -put  adult_data.csv

user@cluster-6ef9-m:~$ hdfs dfs -ls

Found 2 items

drwxrwxrwt   - user hadoop          0 2020-05-02 18:38 .sparkStaging

-rw-r--r--   2 user hadoop    5608318 2020-05-02 18:39 adult_data.csv



By pyspark also fails to find this file.

Dennis Huo

unread,
May 2, 2020, 3:04:46 PM5/2/20
to Google Cloud Dataproc Discussions
It appears this isn't a Dataproc-specific issue, but rather some poor documentation on Spark's part along with a tutorial that only works in a single-node Spark configuration.

Basically, SparkContext.addFile is intended specifically for creation of *local* files that can be accessed with non-Spark-specific local file APIs, as opposed to "Hadoop Filesystem" APIs. In contrast, SQLContext.read is explicitly for "Hadoop Filesystem" paths, even if you end up using "file:///" to specify a local filesystem path that is really available on all nodes.

Debugging what is really happening here can be best illustrated by the following two commands after the failed commands you saw:

# The file path on the master node
> SparkFiles.get("adult_data.csv")
u'/hadoop/spark/tmp/spark-f85f2436-4d81-498d-9484-7541ac9bfc76/userFiles-519dfbbb-0e91-46d4-847e-f6ad20e177e2/adult_data.csv'

# The file path on worker nodes
> sc.parallelize(range(0, 2)).map(lambda x: SparkFiles.get("adult_data.csv")).collect()
[u'/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1588442719844_0001/container_1588442719844_0001_01_000002/adult_data.csv',
 u'/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1588442719844_0001/container_1588442719844_0001_01_000002/adult_data.csv']

Basically, what the Spark documentation failed to emphasize is that SparkFiles.get(String) must be run *independently* on each worker node to find out the worker node's local tmpdir that happened to be chosen for the local file, rather than resolving it a single time in the master node and assuming that the path will be the same in all the workers.

If you look at the code in SparkContext, it never intends to place the added file into any distributed filesystem; it only uses the raw Java/Scala "File" API rather than a Hadoop FileSystem API:


In contrast, SQLContext.read doesn't even support a notion in its interface of specifying a list of different node-specific local file paths that each node should read separately - it assumes only a single "distributed filesystem" directory that can uniformly be split across all the workers.

This discrepancy makes sense in the more usual case for Spark where SQLContext.read is expected to be reading a directory with thousands/millions of files with total sizes of many terabytes, whereas SparkContext.addFile is fundamentally for "single" small files that can really fit on a single machine's local filesystem for local access.

In short, SparkContext.addFile was never intended to be used for staging actual data being processed onto a Spark cluster's local filesystem which is why it's incompatible with SQLContext.read, or SparkContext.textFile, etc.

The tutorial just happens to work exclusively in non-distributed local-runner-only modes where the following conditions hold:

1. Only a single node is used, no distributed workers
2. fs.defaultFS must be file:/// since SparkFiles.get returns only a schemeless path, which otherwise in real prod clusters would get resolved by SQLContext.read into an hdfs:/// path even though it only downloaded locally.

Your approach of simply putting the file into hdfs first is the easiest - you just have to make sure you specify the right HDFS path in your job -- best to do it with absolute paths instead of relative paths since your "working directory" may be different in the local shell vs the spark job or in Jupyter:

hdfs dfs -mkdir hdfs:///mydata
hdfs dfs -put adult_data.csv hdfs:///mydata/

And in the job:

sqlContext.read.csv("hdfs:///mydata")

Richard Holowczak

unread,
May 2, 2020, 4:24:40 PM5/2/20
to Google Cloud Dataproc Discussions
Thank you so much for the explanation!  I doubt I ever would have figured that out on my own.

For those playing along at home, here is the complete solution:
At the shell prompt:
hdfs dfs -mkdir hdfs:///mydata
hdfs dfs -put adult_data.csv hdfs:///mydata/


Then in pyspark:

from pyspark.sql import SQLContext

from pyspark import SparkFiles

sc.setLogLevel("ERROR")

url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"

sqlContext = SQLContext(sc)

df = sqlContext.read.csv("hdfs:///mydata/adult_data.csv", header=True, inferSchema= True)

 

Output:

Using Python version 2.7.13 (default, Sep 26 2018 18:42:22)

SparkSession available as 'spark'.

>>> from pyspark.sql import SQLContext

>>> from pyspark import SparkFiles

>>> sc.setLogLevel("ERROR")

>>> url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"

>>> sqlContext = SQLContext(sc)

>>> df = sqlContext.read.csv("hdfs:///mydata/adult_data.csv", header=True, inferSchema= True)

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used

>>>                                                                             

>>> df.head(5)

[Row(x=1, age=25, workclass=u'Private', fnlwgt=226802, education=u'11th', educational-num=7, marital-status=u'Never-married', occupation=u'Machine-op-inspct', relationship=u'Own-child', race=u'Black', gender=u'Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country=u'United-States', income=u'<=50K'), Row(x=2, age=38, workclass=u'Private', fnlwgt=89814, education=u'HS-grad', educational-num=9, marital-status=u'Married-civ-spouse', occupation=u'Farming-fishing', relationship=u'Husband', race=u'White', gender=u'Male', capital-gain=0, capital-loss=0, hours-per-week=50, native-country=u'United-States', income=u'<=50K'), Row(x=3, age=28, workclass=u'Local-gov', fnlwgt=336951, education=u'Assoc-acdm', educational-num=12, marital-status=u'Married-civ-spouse', occupation=u'Protective-serv', relationship=u'Husband', race=u'White', gender=u'Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country=u'United-States', income=u'>50K'), Row(x=4, age=44, workclass=u'Private', fnlwgt=160323, education=u'Some-college', educational-num=10, marital-status=u'Married-civ-spouse', occupation=u'Machine-op-inspct', relationship=u'Husband', race=u'Black', gender=u'Male', capital-gain=7688, capital-loss=0, hours-per-week=40, native-country=u'United-States', income=u'>50K'), Row(x=5, age=18, workclass=u'?', fnlwgt=103497, education=u'Some-college', educational-num=10, marital-status=u'Never-married', occupation=u'?', relationship=u'Own-child', race=u'White', gender=u'Female', capital-gain=0, capital-loss=0, hours-per-week=30, native-country=u'United-States', income=u'<=50K')]





Reply all
Reply to author
Forward
0 new messages