Running Spark Applications

You can run Spark applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application. Running Spark applications interactively is commonly performed during the data-exploration phase and for ad-hoc analysis.

Because of a limitation in the way Scala compiles code, some applications with nested definitions running in an interactive shell may encounter a Task not serializable exception. Cloudera recommends submitting these applications.

To run applications distributed across a cluster, Spark requires a cluster manager. Cloudera supports two cluster managers: YARN and Spark Standalone. When run on YARN, Spark application processes are managed by the YARN ResourceManager and NodeManager roles. When run on Spark Standalone, Spark application processes are managed by Spark Master and Worker roles.

In CDH 5, Cloudera recommends running Spark applications on a YARN cluster manager instead of on a Spark Standalone cluster manager, for the following benefits:
  • You can dynamically share and centrally configure the same pool of cluster resources among all frameworks that run on YARN.
  • You can use all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • You choose the number of executors to use; in contrast, Spark Standalone requires each application to run an executor on every host in the cluster.
  • Spark can run against Kerberos-enabled Hadoop clusters and use secure authentication between its processes.

For information on monitoring Spark applications, see Monitoring Spark Applications.

Submitting Spark Applications

To submit an application consisting of a Python file or a compiled and packaged Java or Spark JAR, use the spark-submit script.

spark-submit Syntax

spark-submit --option value \
  application jar | python file [application arguments]
Example: Running SparkPi on YARN demonstrates how to run one of the sample applications, SparkPi, packaged with Spark. It computes an approximation to the value of pi.
spark-submit Arguments
Option Description
application jar Path to a JAR file containing a Spark application and all dependencies. The path must be globally visible inside your cluster; see Advanced Dependency Management.
python file Path to a Python file containing a Spark application. The path must be globally visible inside your cluster; see Advanced Dependency Management.
application arguments Arguments to pass to the main method of your main class.

spark-submit Options

You specify spark-submit options using the form --option value instead of --option=value. (Use a space instead of an equals sign.)
Option Description
class For Java and Scala applications, the fully qualified classname of the class containing the main method of the application. For example, org.apache.spark.examples.SparkPi.
conf Spark configuration property in key=value format. For values that contain spaces, surround "key=value" with quotes (as shown).
deploy-mode Deployment mode: cluster and client. In cluster mode, the driver runs on worker hosts. In client mode, the driver runs locally as an external client. Use cluster mode with production jobs; client mode is more appropriate for interactive and debugging uses, where you want to see your application output immediately. To see the effect of the deployment mode when running on YARN, see Deployment Modes.

Default: client.

driver-class-path Configuration and classpath entries to pass to the driver. JARs added with --jars are automatically included in the classpath.
driver-cores Number of cores used by the driver in cluster mode.

Default: 1.

driver-memory Maximum heap size (represented as a JVM string; for example 1024m, 2g, and so on) to allocate to the driver. Alternatively, you can use the spark.driver.memory property.
files Comma-separated list of files to be placed in the working directory of each executor. The path must be globally visible inside your cluster; see Advanced Dependency Management.
jars Additional JARs to be loaded in the classpath of drivers and executors in cluster mode or in the executor classpath in client mode. The path must be globally visible inside your cluster; see Advanced Dependency Management.
master The location to run the application.
packages Comma-separated list of Maven coordinates of JARs to include on the driver and executor classpaths. The local Maven, Maven central, and remote repositories specified in repositories are searched in that order. The format for the coordinates is groupId:artifactId:version.
py-files Comma-separated list of .zip, .egg, or .py files to place on PYTHONPATH. The path must be globally visible inside your cluster; see Advanced Dependency Management.
repositories Comma-separated list of remote repositories to search for the Maven coordinates specified in packages.
Master Values
Master Description
local Run Spark locally with one worker thread (that is, no parallelism).
local[K] Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.)
local[*] Run Spark locally with as many worker threads as logical cores on your host.
spark://host:port Run using the Spark Standalone cluster manager with the Spark Master on the specified host and port (7077 by default).
yarn Run using a YARN cluster manager. The cluster location is determined by HADOOP_CONF_DIR or YARN_CONF_DIR. See Configuring the Environment.

Cluster Execution Overview

Spark orchestrates its operations through the driver program. When the driver program is run, the Spark framework initializes executor processes on the cluster hosts that process your data. The following occurs when you submit a Spark application to a cluster:
  1. The driver is launched and invokes the main method in the Spark application.
  2. The driver requests resources from the cluster manager to launch executors.
  3. The cluster manager launches executors on behalf of the driver program.
  4. The driver runs the application. Based on the transformations and actions in the application, the driver sends tasks to executors.
  5. Tasks are run on executors to compute and save results.
  6. If dynamic allocation is enabled, after executors are idle for a specified period, they are released.
  7. When driver's main method exits or calls SparkContext.stop, it terminates any outstanding executors and releases resources from the cluster manager.