Running Spark Applications

You can run Spark applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application.

To run applications distributed across a cluster, Spark requires a cluster manager. Cloudera supports two cluster managers: Spark Standalone and YARN. Cloudera does not support running Spark applications on Mesos. On Spark Standalone, Spark application processes run on Spark Master and Worker roles. On YARN, Spark application processes run on YARN ResourceManager and NodeManager roles.

In CDH 5, Cloudera recommends running Spark applications on a YARN cluster manager instead of on a Spark Standalone cluster manager, for the following benefits:
  • You can dynamically share and centrally configure the same pool of cluster resources among all frameworks that run on YARN.
  • You can use all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • You choose the number of executors to use; in contrast, Spark Standalone requires each application to run an executor on every host in the cluster.
  • Spark can run against Kerberos enabled Hadoop clusters and use secure authentication between its processes.

For information on monitoring Spark applications, see Monitoring Spark Applications.

Running Spark Applications Interactively

You can run a Spark application interactively using the the Scala spark-shell or Python pyspark shell application. For a complete list of options, run spark-shell or Python pyspark with the -h flag.

For example, when the shell is running, you can perform a word-count application using the following code examples:
  • Scala
    val file = sc.textFile("hdfs://namenode:8020/path/to/input")
    val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://namenode:8020/user/hdfs/output")
  • Python
    file = sc.textFile("hdfs://namenode:8020/path/to/input")
    counts = file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
    counts.saveAsTextFile("hdfs://namenode:8020/user/hdfs/output")

Submitting Spark Applications

Using the spark-submit Script

You submit compiled Spark applications with the spark-submit script.
SPARK_HOME/bin/spark-submit \
  --option value
  application jar | python file\
[application arguments]
Example: Running SparkPi on YARN and Example: Running SparkPi on Spark Standalone demonstrate how to run a sample application, SparkPi, which is packaged with Spark. It computes an approximation to the value of Pi.
spark-submit Options
Option Description
application jar Path to a bundled JAR file including your application and all dependencies. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the URL must be globally visible inside your cluster; for example, an hdfs:// path or a file:// path that exists on all nodes.
python file Path to a Python file containing your application. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the URL must be globally visible inside your cluster; for example, an hdfs:// path or a file:// path that exists on all nodes.
--py-files python files Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
--files files Comma-separated list of files to be placed in the working directory of each executor.
application arguments Arguments passed to the main method of your main class, if any.

Command-Line Options

You specify command-line options using the form --option value instead of --option=value. (Use a space instead of an equals sign.)
Options
Option Description
--class The FQCN of the class containing the main method of the application. For example, org.apache.spark.examples.SparkPi.
--conf Spark configuration property in key=value format. For values that contain spaces, surround "key=value" with quotes (as shown).
--deploy-mode The deployment mode: cluster and client. In cluster mode the driver runs on worker hosts. In client mode, the driver runs locally as an external client. Broadly, cluster mode should be used production jobs, while client mode is more appropriate for interactive and debugging uses, where you want to see your application's output immediately. For affect of the deployment mode when running on YARN, see Deployment Modes.

Default: client.

--driver-cores Number of cores used by the driver, only in cluster mode (Default: 1).
--driver-memory The maximum heap size (represented as a JVM string; for example 1024m, 2g, and so on) to allocate to the driver. Alternatively, you can use the spark.driver.memory configuration parameter.
--jars Additional JARs to be loaded into the classpath of drivers and executors in cluster mode or into the executor classpath in client mode. These JARs can be in the HDFS file system; otherwise they must be available locally on each executor. The path to the JARs on HDFS must be specified as hdfs://nameservice:8020/path/to/jar.
--master The location to run the application.
--packages Comma-separated list of Maven coordinates of JARs to include on the driver and executor classpaths. The local Maven repo, then Maven central, and any additional remote repositories specified in --repositories are searched in that order. The format for the coordinates should be groupId:artifactId:version.
--repositories Comma-separated list of additional remote repositories to search for the Maven coordinates given with --packages.
Master Values
Master Description
local Run Spark locally with one worker thread (that is, no parallelism).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your host).
local[*] Run Spark locally with as many worker threads as logical cores on your host.
spark://host:port Run on the Spark Standalone Master on the specified host. The port must be the one your Master is configured to use (7077 by default).
yarn Run on a YARN cluster. The cluster location is determined by the HADOOP_CONF_DIR or YARN_CONF_DIR variable.

Cluster Overview

MapReduce runs each task in its own process. When a task completes, the process terminates. In Spark, many tasks can run concurrently in an executor, and the executor exists for the lifetime of the Spark application, even when no jobs are running. A cluster manager starts the executor processes.

In this model, tasks can start very quickly and process in-memory data. However, you have less control of resource management. Because the number of executors for an application is typically fixed, and each executor has a fixed allotment of resources, an application uses the same amount of resources for the full duration that it runs.

When running on YARN, you can dynamically increase and decrease the number of executors.

Following are the steps that occur when you submit a Spark application to a cluster:
  1. spark-submit launches the driver program and invokes the main method in the Spark application.
  2. The driver program requests resources from the cluster manager to launch executors.
  3. The cluster manager launches executors on behalf of the driver program.
  4. The driver process runs the user application. Based on the resilient distributed dataset (RDD) actions and transformations in the program, the driver sends tasks to executors.
  5. Tasks are run on executor processes to compute and save results.
  6. If the driver's main method exits or calls SparkContext.stop, it terminates the executors and releases resources from the cluster manager.
Spark Runtime Summary
Mode YARN Client Mode YARN Cluster Mode Spark Standalone
Driver runs in Client ApplicationMaster Client
Requests resources ApplicationMaster ApplicationMaster Client
Starts executor processes YARN NodeManager YARN NodeManager Spark Worker
Persistent services YARN ResourceManager and NodeManagers YARN ResourceManager and NodeManagers Spark Master and Workers
Supports Spark Shell Yes No Yes

For more information, see Cluster Mode Overview.