docs/configuration.md
Spark provides three locations to configure the system:
conf/spark-env.sh script on each node.log4j2.properties.Spark properties control most application settings and are configured separately for each
application. These properties can be set directly on a
SparkConf passed to your
SparkContext. SparkConf allows you to configure some of the common properties
(e.g. master URL and application name), as well as arbitrary key-value pairs through the
set() method. For example, we could initialize an application with two threads as follows:
Note that we run with local[2], meaning two threads - which represents "minimal" parallelism, which can help detect bugs that only exist when we run in a distributed context.
{% highlight scala %} val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf) {% endhighlight %}
Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may actually require more than 1 thread to prevent any sort of starvation issues.
Properties that specify some time duration should be configured with a unit of time. The following format is accepted:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
Properties that specify a byte size should be configured with a unit of size. The following format is accepted:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
While numbers without units are generally interpreted as bytes, a few are interpreted as KiB or MiB. See documentation of individual configuration properties. Specifying units is desirable where possible.
In some cases, you may want to avoid hard-coding certain configurations in a SparkConf. For
instance, if you'd like to run the same application with different masters or different
amounts of memory. Spark allows you to simply create an empty conf:
{% highlight scala %} val sc = new SparkContext(new SparkConf()) {% endhighlight %}
Then, you can supply configuration values at runtime:
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
myApp.jar
The Spark shell and spark-submit
tool support two ways to load configurations dynamically. The first is command line options,
such as --master, as shown above. spark-submit can accept any Spark property using the --conf/-c
flag, but uses special flags for properties that play a part in launching the Spark application.
Running ./bin/spark-submit --help will show the entire list of these options.
When configurations are specified via the --conf/-c flags, bin/spark-submit will also read
configuration options from conf/spark-defaults.conf, in which each line consists of a key and
a value separated by whitespace. For example:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
In addition, a property file with Spark configurations can be passed to bin/spark-submit via
--properties-file parameter. When this is set, Spark will no longer load configurations from
conf/spark-defaults.conf unless another parameter --load-spark-defaults is provided.
Any values specified as flags or in the properties file will be passed on to the application
and merged with those specified through SparkConf. Properties set directly on the SparkConf
take the highest precedence, then those through --conf flags or --properties-file passed to
spark-submit or spark-shell, then options in the spark-defaults.conf file. A few
configuration keys have been renamed since earlier versions of Spark; in such cases, the older
key names are still accepted, but take lower precedence than any instance of the newer key.
Spark properties mainly can be divided into two kinds: one is related to deploy, like
"spark.driver.memory", "spark.executor.instances", this kind of properties may not be affected when
setting programmatically through SparkConf in runtime, or the behavior is depending on which
cluster manager and deploy mode you choose, so it would be suggested to set through configuration
file or spark-submit command line options; another is mainly related to Spark runtime control,
like "spark.task.maxFailures", this kind of properties can be set in either way.
The application web UI at http://<driver>:4040 lists Spark properties in the "Environment" tab.
This is a useful place to check to make sure that your properties have been set correctly. Note
that only values explicitly specified through spark-defaults.conf, SparkConf, or the command
line will appear. For all other configuration properties, you can assume the default value is used.
Most of the properties that control internal settings have reasonable default values. Some of the most common options to set are:
<em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code>
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the <code>--driver-memory</code> command line option
or in your default properties file.
<em>Note:</em> This feature is dependent on Python's <code>resource</code> module; therefore, the behaviors and
limitations are inherited. For instance, Windows does not support resource limiting and actual
resource is not limited on MacOS.
<em>Note:</em> Additional memory includes PySpark executor memory
(when <code>spark.executor.pyspark.memory</code> is not configured) and memory used by other
non-executor processes running in the same container. The maximum memory size of container to
running executor is determined by the sum of <code>spark.executor.memoryOverhead</code>,
<code>spark.executor.memory</code>, <code>spark.memory.offHeap.size</code> and
<code>spark.executor.pyspark.memory</code>.
<em>Note:</em> This will be overridden by SPARK_LOCAL_DIRS (Standalone) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.
Apart from these, the following properties are also available, and may be useful in some situations:
<em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code> directly in your application, because the driver JVM has already started at that point. Instead, please set this through the <code>--driver-class-path</code> command line option or in your default properties file.
</td> <td>1.0.0</td> </tr> <tr> <td><code>spark.driver.defaultJavaOptions</code></td> <td>(none)</td> <td> A string of default JVM options to prepend to <code>spark.driver.extraJavaOptions</code>. This is intended to be set by administrators.For instance, GC settings or other logging.
Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap
size settings can be set with <code>spark.driver.memory</code> in the cluster mode and through
the <code>--driver-memory</code> command line option in the client mode.
<em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code> directly in your application, because the driver JVM has already started at that point. Instead, please set this through the <code>--driver-java-options</code> command line option or in your default properties file.
</td> <td>3.0.0</td> </tr> <tr> <td><code>spark.driver.extraJavaOptions</code></td> <td>(none)</td> <td> A string of extra JVM options to pass to the driver. This is intended to be set by users.For instance, GC settings or other logging.
Note that it is illegal to set maximum heap size (-Xmx) settings with this option. Maximum heap
size settings can be set with <code>spark.driver.memory</code> in the cluster mode and through
the <code>--driver-memory</code> command line option in the client mode.
<em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code> directly in your application, because the driver JVM has already started at that point. Instead, please set this through the <code>--driver-java-options</code> command line option or in your default properties file.
<code>spark.driver.defaultJavaOptions</code> will be prepended to this configuration.
<em>Note:</em> In client mode, this config must not be set through the <code>SparkConf</code> directly in your application, because the driver JVM has already started at that point. Instead, please set this through the <code>--driver-library-path</code> command line option or in your default properties file.
</td> <td>1.0.0</td> </tr> <tr> <td><code>spark.driver.userClassPathFirst</code></td> <td>false</td> <td> (Experimental) Whether to give user-added jars precedence over Spark's own jars when loading classes in the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature.This is used in cluster mode only.
For instance, GC settings or other logging.
Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.
The following symbols, if present will be interpolated: {{APP_ID}} will be replaced by
application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For example, to enable
verbose gc logging to a file named for the executor ID of the app in /tmp, pass a 'value' of:
<code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
For instance, GC settings or other logging.
Note that it is illegal to set Spark properties or maximum heap size (-Xmx) settings with this
option. Spark properties should be set using a SparkConf object or the spark-defaults.conf file
used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.
The following symbols, if present will be interpolated: {{APP_ID}} will be replaced by
application ID and {{EXECUTOR_ID}} will be replaced by executor ID. For example, to enable
verbose gc logging to a file named for the executor ID of the app in /tmp, pass a 'value' of:
<code>-verbose:gc -Xloggc:/tmp/{{APP_ID}}-{{EXECUTOR_ID}}.gc</code>
<code>spark.executor.defaultJavaOptions</code> will be prepended to this configuration.
By default the <code>pyspark.profiler.BasicProfiler</code> will be used, but this can be overridden by
passing a profiler class in as a parameter to the <code>SparkContext</code> constructor.
The setting affects link generation in the Spark UI, but the front-end reverse proxy is responsible for <ul> <li>stripping a path prefix before forwarding the request,</li> <li>rewriting redirects which point directly to the Spark master,</li> <li>redirecting access from <code>http://mydomain.com/path/to/spark</code> to <code>http://mydomain.com/path/to/spark/</code> (trailing slash after path prefix); otherwise relative links on the master page do not work correctly.</li> </ul> This setting affects all the workers and application UIs running in the cluster and must be set identically on all the workers, drivers and masters. In is only effective when <code>spark.ui.reverseProxy</code> is turned on. This setting is not needed when the Spark master web UI is directly reachable.
Note that the value of the setting can't contain the keyword <code>proxy</code> or <code>history</code> after split by "/". Spark UI relies on both keywords for getting REST API endpoints from URIs.
<em>Note:</em> In shell environment, the default value of spark.ui.showConsoleProgress is true.
Filter parameters can also be specified in the configuration, by setting config entries of the form <code>spark.<class name of filter>.param.<param name>=<value></code>
For example:
<code>spark.ui.filters=com.test.filter1</code>
<code>spark.com.test.filter1.param.name1=foo</code>
<code>spark.com.test.filter1.param.name2=bar</code>
</td> <td>1.0.0</td> </tr> <tr> <td><code>spark.ui.requestHeaderSize</code></td> <td>8k</td> <td> The maximum allowed size for a HTTP request header, in bytes unless otherwise specified. This setting applies for the Spark History Server too. </td> <td>2.2.3</td> </tr> <tr> <td><code>spark.ui.timelineEnabled</code></td> <td>true</td> <td> Whether to display event timeline data on UI pages. </td> <td>3.4.0</td> </tr> <tr> <td><code>spark.ui.timeline.executors.maximum</code></td> <td>250</td> <td> The maximum number of executors shown in the event timeline. </td> <td>3.2.0</td> </tr> <tr> <td><code>spark.ui.timeline.jobs.maximum</code></td> <td>500</td> <td> The maximum number of jobs shown in the event timeline. </td> <td>3.2.0</td> </tr> <tr> <td><code>spark.ui.timeline.stages.maximum</code></td> <td>500</td> <td> The maximum number of stages shown in the event timeline. </td> <td>3.2.0</td> </tr> <tr> <td><code>spark.ui.timeline.tasks.maximum</code></td> <td>1000</td> <td> The maximum number of tasks shown in the event timeline. </td> <td>1.4.0</td> </tr> </table>This context cleaner triggers cleanups only when weak references are garbage collected.
In long-running applications with large driver JVMs, where there is little memory pressure
on the driver, this may happen very occasionally or not at all. Not cleaning at all may
lead to executors running out of disk space after a while.
<em>Note:</em> The metrics are polled (collected) and sent in the executor heartbeat,
and this is always done; this configuration is only to determine if aggregated metric peaks
are written to the event log.
<em>Note:</em> The process tree metrics are collected only if the /proc filesystem
exists.
If 0, the polling is done on executor heartbeats (thus at the heartbeat interval,
specified by <code>spark.executor.heartbeatInterval</code>).
If positive, the polling is done at this interval.
It also allows a different address from the local one to be advertised to executors or external systems. This is useful, for example, when running containers with bridged networking. For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be forwarded from the container's host.
</td> <td>2.1.0</td> </tr> <tr> <td><code>spark.driver.host</code></td> <td>(local hostname)</td> <td> Hostname or IP address for the driver. This is used for communicating with the executors and the standalone Master. </td> <td>0.7.0</td> </tr> <tr> <td><code>spark.driver.port</code></td> <td>(random)</td> <td> Port for the driver to listen on. This is used for communicating with the executors and the standalone Master. </td> <td>0.7.0</td> </tr> <tr> <td><code>spark.driver.metrics.pollingInterval</code></td> <td>10s</td> <td> How often to collect driver metrics (in milliseconds). If unset, the polling is done at the executor heartbeat interval. If set, the polling is done at this interval. </td> <td>4.1.0</td> </tr> <tr> <td><code>spark.io.mode.default</code></td> <td>AUTO</td> <td> The default IO mode for Netty transports. One of <code>NIO</code>, <code>EPOLL</code>, <code>KQUEUE</code>, or <code>AUTO</code>. The default value is <code>AUTO</code> which means to use native Netty libraries if available. In other words, for Linux environments, <code>EPOLL</code> is used if available before using <code>NIO</code>. For MacOS/BSD environments, <code>KQUEUE</code> is used if available before using <code>NIO</code>. </td> <td>4.1.0</td> </tr> <tr> <td><code>spark.rpc.io.backLog</code></td> <td>64</td> <td> Length of the accept queue for the RPC server. For large applications, this value may need to be increased, so that incoming connections are not dropped when a large number of connections arrives in a short period of time. </td> <td>3.0.0</td> </tr> <tr> <td><code>spark.network.timeout</code></td> <td>120s</td> <td> Default timeout for all network interactions. This config will be used in place of <code>spark.storage.blockManagerHeartbeatTimeoutMs</code>, <code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or <code>spark.rpc.lookupTimeout</code> if they are not configured. </td> <td>1.3.0</td> </tr> <tr> <td><code>spark.network.timeoutInterval</code></td> <td>60s</td> <td> Interval for the driver to check and expire dead executors. </td> <td>1.3.2</td> </tr> <tr> <td><code>spark.network.io.preferDirectBufs</code></td> <td>true</td> <td> If enabled then off-heap buffer allocations are preferred by the shared allocators. Off-heap buffers are used to reduce garbage collection during shuffle and cache block transfer. For environments where off-heap memory is tightly limited, users may wish to turn this off to force all allocations to be on-heap. </td> <td>3.0.0</td> </tr> <tr> <td><code>spark.port.maxRetries</code></td> <td>16</td> <td> Maximum number of retries when binding to a port before giving up. When a port is given a specific value (non 0), each subsequent retry will increment the port used in the previous attempt by 1 before retrying. This essentially allows it to try a range of ports from the start port specified to port + maxRetries. </td> <td>1.1.1</td> </tr> <tr> <td><code>spark.rpc.askTimeout</code></td> <td><code>spark.network.timeout</code></td> <td> Duration for an RPC ask operation to wait before timing out. </td> <td>1.4.0</td> </tr> <tr> <td><code>spark.rpc.lookupTimeout</code></td> <td>120s</td> <td> Duration for an RPC remote endpoint lookup operation to wait before timing out. </td> <td>1.4.0</td> </tr> <tr> <td><code>spark.network.maxRemoteBlockSizeFetchToMem</code></td> <td>200m</td> <td> Remote block will be fetched to disk when size of the block is above this threshold in bytes. This is to avoid a giant request takes too much memory. Note this configuration will affect both shuffle fetch and block manager remote block fetch. For users who enabled external shuffle service, this feature can only work when external shuffle service is at least 2.3.0. </td> <td>3.0.0</td> </tr> <tr> <td><code>spark.rpc.io.connectionTimeout</code></td> <td>value of <code>spark.network.timeout</code></td> <td> Timeout for the established connections between RPC peers to be marked as idled and closed if there are outstanding RPC requests but no traffic on the channel for at least <code>connectionTimeout</code>. </td> <td>1.2.0</td> </tr> <tr> <td><code>spark.rpc.io.connectionCreationTimeout</code></td> <td>value of <code>spark.rpc.io.connectionTimeout</code></td> <td> Timeout for establishing a connection between RPC peers. </td> <td>3.2.0</td> </tr> </table>This requires one of the following conditions:
1) enabling external shuffle service through <code>spark.shuffle.service.enabled</code>, or
2) enabling shuffle tracking through <code>spark.dynamicAllocation.shuffleTracking.enabled</code>, or
3) enabling shuffle blocks decommission through <code>spark.decommission.enabled</code> and <code>spark.storage.decommission.shuffleBlocks.enabled</code>, or
4) (Experimental) configuring <code>spark.shuffle.sort.io.plugin.class</code> to use a custom <code>ShuffleDataIO</code> who's <code>ShuffleDriverComponents</code> supports reliable storage.
The following configurations are also relevant:
<code>spark.dynamicAllocation.minExecutors</code>,
<code>spark.dynamicAllocation.maxExecutors</code>, and
<code>spark.dynamicAllocation.initialExecutors</code>
<code>spark.dynamicAllocation.executorAllocationRatio</code>
If <code>--num-executors</code> (or <code>spark.executor.instances</code>) is set and larger than this value, it will
be used as the initial number of executors.
Depending on jobs and cluster configurations, we can set number of threads in several places in Spark to utilize available resources efficiently to get better performance. Prior to Spark 3.0, these thread configurations apply to all roles of Spark, such as driver, executor, worker and master. From Spark 3.0, we can configure threads in finer granularity starting from driver and executor. Take RPC module as example in below table. For other modules, like shuffle, just replace "rpc" with "shuffle" in the property names except <code>spark.{driver|executor}.rpc.netty.dispatcher.numThreads</code>, which is only for RPC module.
<table class="spark-config"> <thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead> <tr> <td><code>spark.{driver|executor}.rpc.io.serverThreads</code></td> <td> Fall back on <code>spark.rpc.io.serverThreads</code> </td> <td>Number of threads used in the server thread pool</td> <td>1.6.0</td> </tr> <tr> <td><code>spark.{driver|executor}.rpc.io.clientThreads</code></td> <td> Fall back on <code>spark.rpc.io.clientThreads</code> </td> <td>Number of threads used in the client thread pool</td> <td>1.6.0</td> </tr> <tr> <td><code>spark.{driver|executor}.rpc.netty.dispatcher.numThreads</code></td> <td> Fall back on <code>spark.rpc.netty.dispatcher.numThreads</code> </td> <td>Number of threads used in RPC message dispatcher thread pool</td> <td>3.0.0</td> </tr> </table>The default value for number of thread-related config keys is the minimum of the number of cores requested for the driver or executor, or, in the absence of that value, the number of cores available for the JVM (with a hardcoded upper limit of 8).
Server configurations are set in Spark Connect server, for example, when you start the Spark Connect server with ./sbin/start-connect-server.sh.
They are typically set via the config file and command-line options with --conf/-c.
Please refer to the Security page for available options on how to secure different Spark subsystems.
Runtime SQL configurations are per-session, mutable Spark SQL configurations. They can be set with initial values by the config file
and command-line options with --conf/-c prefixed, or by setting SparkConf that are used to create SparkSession.
Also, they can be set and queried by SET commands and reset to their initial values by RESET command,
or by SparkSession.conf's setter and getter methods in runtime.
{% include_api_gen generated-runtime-sql-config-table.html %}
Static SQL configurations are cross-session, immutable Spark SQL configurations. They can be set with final values by the config file
and command-line options with --conf/-c prefixed, or by setting SparkConf that are used to create SparkSession.
External users can query the static sql config values via SparkSession.conf or via set command, e.g. SET spark.sql.extensions;, but cannot set/unset them.
{% include_api_gen generated-static-sql-config-table.html %}
Each cluster manager in Spark has additional configuration options. Configurations can be found on the pages for each mode:
Certain Spark settings can be configured through environment variables, which are read from the
conf/spark-env.sh script in the directory where Spark is installed (or conf/spark-env.cmd on
Windows). In Standalone mode, this file can give machine specific information such as
hostnames. It is also sourced when running local Spark applications or submission scripts.
Note that conf/spark-env.sh does not exist by default when Spark is installed. However, you can
copy conf/spark-env.sh.template to create it. Make sure you make the copy executable.
The following variables can be set in spark-env.sh:
In addition to the above, there are also options for setting up the Spark standalone cluster scripts, such as number of cores to use on each machine and maximum memory.
Since spark-env.sh is a shell script, some of these can be set programmatically -- for example, you might
compute SPARK_LOCAL_IP by looking up the IP of a specific network interface.
Note: When running Spark on YARN in cluster mode, environment variables need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName] property in your conf/spark-defaults.conf file. Environment variables that are set in spark-env.sh will not be reflected in the YARN Application Master process in cluster mode. See the YARN-related Spark Properties for more information.
Spark uses log4j for logging. You can configure it by adding a log4j2.properties file in the conf directory. To get started, copy one of the provided templates: log4j2.properties.template (for plain text logging) or log4j2-json-layout.properties.template (for structured logging).
The default logging format is plain text, using Log4j's Pattern Layout.
MDC (Mapped Diagnostic Context) information is not included by default in plain text logs. To include it, update the PatternLayout configuration in the log4j2.properties file. For example, add %X{task_name} to include the task name in logs. Additionally, use spark.sparkContext.setLocalProperty("key", "value") to add custom data to the MDC.
Starting with version 4.0.0, spark-submit supports optional structured logging using the JSON Template Layout. This format enables efficient querying of logs with Spark SQL using the JSON data source and includes all MDC information for improved searchability and debugging.
To enable structured logging and include MDC information, set the configuration spark.log.structuredLogging.enabled to true (default is false). For additional customization, copy log4j2-json-layout.properties.template to conf/log4j2.properties and adjust as needed.
To query structured logs in JSON format, use the following code snippet:
Python:
from pyspark.logger import SPARK_LOG_SCHEMA
logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
Scala:
import org.apache.spark.util.LogUtils.SPARK_LOG_SCHEMA
val logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
Note: If you're using the interactive shell (pyspark shell or spark-shell), you can omit the import statement in the code because SPARK_LOG_SCHEMA is already available in the shell's context.
To specify a different configuration directory other than the default "SPARK_HOME/conf", you can set SPARK_CONF_DIR. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j2.properties, etc) from this directory.
If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that should be included on Spark's classpath:
hdfs-site.xml, which provides default behaviors for the HDFS client.core-site.xml, which sets the default filesystem name.The location of these configuration files varies across Hadoop versions, but
a common location is inside of /etc/hadoop/conf. Some tools create
configurations on-the-fly, but offer a mechanism to download copies of them.
To make these files visible to Spark, set HADOOP_CONF_DIR in $SPARK_HOME/conf/spark-env.sh
to a location containing the configuration files.
If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive configuration files in Spark's classpath.
Multiple running applications might require different Hadoop/Hive client side configurations.
You can copy and modify hdfs-site.xml, core-site.xml, yarn-site.xml, hive-site.xml in
Spark's classpath for each application. In a Spark cluster running on YARN, these configuration
files are set cluster-wide, and cannot safely be changed by the application.
The better choice is to use spark hadoop properties in the form of spark.hadoop.*, and use
spark hive properties in the form of spark.hive.*.
For example, adding configuration "spark.hadoop.abc.def=xyz" represents adding hadoop property "abc.def=xyz",
and adding configuration "spark.hive.abc=xyz" represents adding hive property "hive.abc=xyz".
They can be considered as same as normal spark properties which can be set in $SPARK_HOME/conf/spark-defaults.conf
In some cases, you may want to avoid hard-coding certain configurations in a SparkConf. For
instance, Spark allows you to simply create an empty conf and set spark/spark hadoop/spark hive properties.
{% highlight scala %} val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz") val sc = new SparkContext(conf) {% endhighlight %}
Also, you can modify or add configurations at runtime:
{% highlight bash %}
./bin/spark-submit
--name "My app"
--master "local[4]"
--conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
--conf spark.hadoop.abc.def=xyz
--conf spark.hive.abc=xyz
myApp.jar
{% endhighlight %}
GPUs and other accelerators have been widely used for accelerating special workloads, e.g., deep learning and signal processing. Spark now supports requesting and scheduling generic resources, such as GPUs, with a few caveats. The current implementation requires that the resource have addresses that can be allocated by the scheduler. It requires your cluster manager to support and be properly configured with the resources.
There are configurations available to request resources for the driver: spark.driver.resource.{resourceName}.amount, request resources for the executor(s): spark.executor.resource.{resourceName}.amount and specify the requirements for each task: spark.task.resource.{resourceName}.amount. The spark.driver.resource.{resourceName}.discoveryScript config is required on YARN, Kubernetes and a client side Driver on Spark Standalone. spark.executor.resource.{resourceName}.discoveryScript config is required for YARN and Kubernetes. Kubernetes also requires spark.driver.resource.{resourceName}.vendor and/or spark.executor.resource.{resourceName}.vendor. See the config descriptions above for more information on each.
Spark will use the configurations specified to first request containers with the corresponding resources from the cluster manager. Once it gets the container, Spark launches an Executor in that container which will discover what resources the container has and the addresses associated with each resource. The Executor will register with the Driver and report back the resources available to that Executor. The Spark scheduler can then schedule tasks to each Executor and assign specific resource addresses based on the resource requirements the user specified. The user can see the resources assigned to a task using the TaskContext.get().resources api. On the driver, the user can see the resources assigned with the SparkContext resources call. It's then up to the user to use the assigned addresses to do the processing they want or pass those into the ML/AI framework they are using.
See your cluster manager specific page for requirements and details on each of - YARN, Kubernetes and Standalone Mode. It is currently not available with local mode. And please also note that local-cluster mode with multiple workers is not supported(see Standalone documentation).
The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run. This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN, Kubernetes and Standalone cluster right now. See the YARN page or Kubernetes page or Standalone page for more implementation details.
See the RDD.withResources and ResourceProfileBuilder API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with DEFAULT_RESOURCE_PROFILE. While when dynamic allocation is enabled, the current implementation acquires new executors for each ResourceProfile created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config spark.scheduler.resource.profileMergeConflicts to control that behavior. The current merge strategy Spark implements when spark.scheduler.resource.profileMergeConflicts is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.
Push-based shuffle helps improve the reliability and performance of spark shuffle. It takes a best-effort approach to push the shuffle blocks generated by the map tasks to remote external shuffle services to be merged per shuffle partition. Reduce tasks fetch a combination of merged shuffle partitions and original shuffle blocks as their input data, resulting in converting small random disk reads by external shuffle services into large sequential reads. Possibility of better data locality for reduce tasks additionally helps minimize network IO. Push-based shuffle takes priority over batch fetch for some scenarios, like partition coalesce when merged output is available.
<p> Push-based shuffle improves performance for long running jobs/queries which involves large disk I/O during shuffle. Currently it is not well suited for jobs/queries which runs quickly dealing with lesser amount of shuffle data. This will be further improved in the future releases.</p> <p> <b> Currently push-based shuffle is only supported for Spark on YARN with external shuffle service. </b></p>