PySparkrequires the same minor version of Python in both driver and workers. It uses the default python version in PATH,you can specify which version of Python you want to use by PYSPARK_PYTHON, for example:
Spark 3.5.1 is built and distributed to work with Scala 2.12by default. (Spark can be built to work with other versions of Scala, too.) To writeapplications in Scala, you will need to use a compatible Scala version (e.g. 2.12.X).
The first thing a Spark program must do is to create a SparkContext object, which tells Sparkhow to access a cluster. To create a SparkContext you first need to build a SparkConf objectthat contains information about your application.
The first thing a Spark program must do is to create a JavaSparkContext object, which tells Sparkhow to access a cluster. To create a SparkContext you first need to build a SparkConf objectthat contains information about your application.
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in thevariable called sc. Making your own SparkContext will not work. You can set which master thecontext connects to using the --master argument, and you can add Python .zip, .egg or .py filesto the runtime path by passing a comma-separated list to --py-files. For third-party Python dependencies,see Python Package Management. You can also add dependencies(e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinatesto the --packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype)can be passed to the --repositories argument. For example, to runbin/pyspark on exactly four cores, use:
It is also possible to launch the PySpark shell in IPython, theenhanced Python interpreter. PySpark works with IPython 1.0.0 and later. Touse IPython, set the PYSPARK_DRIVER_PYTHON variable to ipython when running bin/pyspark:
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in thevariable called sc. Making your own SparkContext will not work. You can set which master thecontext connects to using the --master argument, and you can add JARs to the classpathby passing a comma-separated list to the --jars argument. You can also add dependencies(e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinatesto the --packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype)can be passed to the --repositories argument. For example, to run bin/spark-shell on exactlyfour cores, use:
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizingan existing collection in your driver program, or referencing a dataset in an external storage system, such as ashared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list.We describe operations on distributed datasets later on.
Once created, the distributed dataset (distData) can be operated on in parallel. For example, we might call distData.reduce((a, b) -> a + b) to add up the elements of the list.We describe operations on distributed datasets later on.
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
Note this feature is currently marked Experimental and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.
PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles theresulting Java objects using pickle. When saving an RDD of key-value pairs to SequenceFile,PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The followingWritables are automatically converted:
Arrays are not handled out-of-the-box. Users need to specify custom ArrayWritable subtypes when reading or writing. When writing,users also need to specify custom converters that convert arrays to custom ArrayWritable subtypes. When reading, the defaultconverter will convert custom ArrayWritable subtypes to Java Object[], which then get pickled to Python tuples. To getPython array.array for arrays of primitive types, users need to specify custom converters.
Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, andthe key and value classes can easily be converted according to the above table,then this approach should work well for such cases.
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.
RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.
JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
JavaRDD.saveAsObjectFile and JavaSparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
3a8082e126