Hdfs Configuration Properties

1 view
Skip to first unread message

Zareen Zapata

unread,
Aug 5, 2024, 1:50:32 PM8/5/24
to popssistuli
Sparkproperties control most application settings and are configured separately for eachapplication. These properties can be set directly on aSparkConf passed to yourSparkContext. SparkConf allows you to configure some of the common properties(e.g. master URL and application name), as well as arbitrary key-value pairs through theset() method. For example, we could initialize an application with two threads as follows:

While numbers without units are generally interpreted as bytes, a few are interpreted as KiB or MiB.See documentation of individual configuration properties. Specifying units is desirable wherepossible.


The Spark shell and spark-submittool support two ways to load configurations dynamically. The first is command line options,such as --master, as shown above. spark-submit can accept any Spark property using the --conf/-cflag, but uses special flags for properties that play a part in launching the Spark application.Running ./bin/spark-submit --help will show the entire list of these options.


Any values specified as flags or in the properties file will be passed on to the applicationand merged with those specified through SparkConf. Properties set directly on the SparkConftake highest precedence, then flags passed to spark-submit or spark-shell, then optionsin the spark-defaults.conf file. A few configuration keys have been renamed since earlierversions of Spark; in such cases, the older key names are still accepted, but take lowerprecedence than any instance of the newer key.


The default value for number of thread-related config keys is the minimum of the number of cores requested forthe driver or executor, or, in the absence of that value, the number of cores available for the JVM (with a hardcoded upper limit of 8).


Server configurations are set in Spark Connect server, for example, when you start the Spark Connect server with ./sbin/start-connect-server.sh.They are typically set via the config file and command-line options with --conf/-c.


The advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.


Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework.


When true and 'spark.sql.adaptive.enabled' is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes'), to avoid too many small tasks.


The initial number of shuffle partitions before coalescing. If not set, it equals to spark.sql.shuffle.partitions. This configuration only has an effect when 'spark.sql.adaptive.enabled' and 'spark.sql.adaptive.coalescePartitions.enabled' are both true.


When true, Spark does not respect the target size specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes' (default 64MB) when coalescing contiguous shuffle partitions, but adaptively calculate the target size according to the default parallelism of the Spark cluster. The calculated size is usually smaller than the configured target size. This is to maximize the parallelism and avoid performance regression when enabling adaptive query execution. It's recommended to set this config to false and respect the configured target size.


When true and 'spark.sql.adaptive.enabled' is true, Spark tries to use local shuffle reader to read the shuffle data when the shuffle partitioning is not needed, for example, after converting sort-merge join to broadcast-hash join.


Configures the maximum size in bytes per partition that can be allowed to build local hash map. If this value is not smaller than spark.sql.adaptive.advisoryPartitionSizeInBytes and all the partition size are not larger than this config, join selection prefer to use shuffled hash join instead of sort merge join regardless of the value of spark.sql.join.preferSortMergeJoin.


When true and 'spark.sql.adaptive.enabled' is true, Spark will optimize the skewed shuffle partitions in RebalancePartitions and split them to smaller ones according to the target size (specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes'), to avoid data skew.


Configures a list of rules to be disabled in the adaptive optimizer, in which the rules are specified by their rule names and separated by comma. The optimizer will log the rules that have indeed been excluded.


A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' multiplying the median partition size. Ideally this config should be set larger than 'spark.sql.adaptive.advisoryPartitionSizeInBytes'.


When true, Spark SQL uses an ANSI compliant dialect instead of being Hive compliant. For example, Spark will throw an exception at runtime instead of returning null results when the inputs to a SQL operator/function are invalid.For full details of this dialect, you can find them in the section "ANSI Compliance" of Spark's documentation. Some ANSI dialect features may be not from the ANSI SQL standard directly, but their behaviors align with ANSI SQL's style


When true and 'spark.sql.ansi.enabled' is true, the Spark SQL parser enforces the ANSI reserved keywords and forbids SQL queries that use reserved keywords as alias names and/or identifiers for table, view, function, etc.


Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run, and file-based data source tables where the statistics are computed directly on the files of data.


Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.


When true, if two bucketed tables with the different number of buckets are joined, the side with a bigger number of buckets will be coalesced to have the same number of buckets as the other side. Bigger number of buckets is divisible by the smaller number of buckets. Bucket coalescing is applied to sort-merge joins and shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling in join, but it also reduces parallelism and could possibly cause OOM for shuffled hash join.


The ratio of the number of two buckets being coalesced should be less than or equal to this value for bucket coalescing to be applied. This configuration only has an effect when 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' is set to true.


A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.


When true, Spark replaces CHAR type with VARCHAR type in CREATE/REPLACE/ALTER TABLE commands, so that newly created/updated tables will not have CHAR type columns/fields. Existing tables with CHAR type columns/fields are not affected by this config.


If the configuration property is set to true, java.time.Instant and java.time.LocalDate classes of Java 8 API are used as external types for Catalyst's TimestampType and DateType. If it is set to false, java.sql.Timestamp and java.sql.Date are used for the same purpose.


When PRETTY, the error message consists of textual representation of error class, message and query context. The MINIMAL and STANDARD formats are pretty JSON formats where STANDARD includes an additional JSON field message. This configuration property influences on error messages of Thrift Server and SQL CLI while running queries.


When converting Arrow batches to Spark DataFrame, local collections are used in the driver side if the byte size of Arrow batches is smaller than this threshold. Otherwise, the Arrow batches are sent and deserialized to Spark internal rows in the executors.


When true, make use of Apache Arrow for columnar data transfers in PySpark. This optimization applies to: 1. pyspark.sql.DataFrame.toPandas. 2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame or a NumPy ndarray. The following data type is unsupported: ArrayType of TimestampType.


(Experimental) When true, make use of Apache Arrow's self-destruct and split-blocks options for columnar data transfers in PySpark, when converting from Arrow to Pandas. This reduces memory usage at the cost of some CPU time. This optimization applies to: pyspark.sql.DataFrame.toPandas when 'spark.sql.execution.arrow.pyspark.enabled' is set.


When true, make use of Apache Arrow for columnar data transfers in SparkR. This optimization applies to: 1. createDataFrame when its input is an R DataFrame 2. collect 3. dapply 4. gapply The following data types are unsupported: FloatType, BinaryType, ArrayType, StructType and MapType.


The conversion mode of struct type when creating pandas DataFrame. When "legacy",1. when Arrow optimization is disabled, convert to Row object, 2. when Arrow optimization is enabled, convert to dict or raise an Exception if there are duplicated nested field names. When "row", convert to Row object regardless of Arrow optimization. When "dict", convert to dict and use suffixed key names, e.g., a_0, a_1, if there are duplicated nested field names, regardless of Arrow optimization.


Same as spark.buffer.size but only applies to Pandas UDF executions. If it is not set, the fallback is spark.buffer.size. Note that Pandas execution requires more than 4 bytes. Lowering this value could make small Pandas UDF batch iterated and pipelined; however, it might degrade performance. See SPARK-27870.

3a8082e126
Reply all
Reply to author
Forward
0 new messages