This is the documentation for Cloudera 5.4.x. Documentation for other versions is available at Cloudera Documentation.

Running Spark Applications

You can run Spark applications using an interactive shell or by submitting an application. You can run Spark applications locally or distributed across a cluster.

In order to run applications distributed across a cluster, Spark requires a cluster manager. Cloudera supports two cluster managers: Spark Standalone and YARN. Cloudera does not support running Spark applications on Mesos. When run on Spark Standalone, Spark application processes run on Spark Master and Worker roles. When run on YARN, Spark application processes run on YARN ResourceManager and NodeManager roles.

As of CDH 5, Cloudera recommends running Spark applications on a YARN cluster manager, rather than on a Spark Standalone cluster manager. Using YARN as the Spark cluster manager confers several benefits:
  • YARN allows you to dynamically share and centrally configure the same pool of cluster resources between all frameworks that run on YARN.
  • You can take advantage of all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • You choose the number of executors to use; in contrast, Spark Standalone requires each application to run an executor on every host in the cluster.
  • YARN is the only Spark cluster manager that supports security. With YARN, Spark can run against Kerberized Hadoop clusters and uses secure authentication between its processes.

For information on monitoring Spark applications, see Monitoring Spark Applications.

Running Spark Applications Interactively

You can run a Spark application interactively using the the Scala spark-shell or Python pyspark shell application:
  • Scala - spark-shell
  • Python - pyspark
Once the shell is running, perform a word count application using the following code:
  • Scala
    val file = sc.textFile("hdfs://namenode:8020/path/to/input")
    val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://namenode:8020/output")
  • Python
    file = sc.textFile("hdfs://namenode:8020/path/to/input")
    counts = file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
    counts.saveAsTextFile("hdfs://namenode:8020/user/hdfs/output")
  Note: Some applications that have nested definitions run in an interactive shell may encounter a Task not serializable exception, because of a limitation in the way Scala compiles code. Cloudera recommends submitting such applications.

Submitting Spark Applications

spark-submit

You submit compiled Spark applications with the spark-submit script. spark-submit does not handle command line options of the form --key=value; use --key value instead. (That is, use a space instead of an equals sign.):

./bin/spark-submit \
  --class main-class \
  --master master-url \
  --deploy-mode deploy-mode \
  --conf key=value \
  ... # other options
  application-jar \
[application-arguments]
Table 1. spark-submit Options
Option Description
--class The entry point of your application (for example, org.apache.spark.examples.SparkPi)
--master The master URL of the cluster.
--deploy-mode Whether to deploy the driver on the worker hosts (cluster) or locally as an external client (client) (default: client)
--conf Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
application-jar Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
application-arguments Arguments passed to the main method of your main class, if any.
Table 2. Master URL Values
Master URL Description
local Run Spark locally with one worker thread (that is, no parallelism).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your host).
local[*] Run Spark locally with as many worker threads as logical cores on your host.
spark://host:port Connect to the specified Spark Standalone Master. The port must be whichever one your Master is configured to use, 7077 by default.
yarn-client Connect to a YARN cluster in client mode. The cluster location will be found based on the HADOOP_CONF_DIR variable.
yarn-cluster Connect to a YARN cluster in cluster mode. The cluster location will be found based on HADOOP_CONF_DIR.
Example: Running SparkPi on YARN and Example: Running SparkPi on Spark Standalone use a sample JAR, SparkPi, which is packaged with Spark. It computes an approximation to the value of Pi.

Cluster Overview

In Spark the cluster manager is responsible for starting the executor processes.

MapReduce runs each task in its own process. When a task completes, the process goes away. In Spark, many tasks can run concurrently in an executor, and the executor sticks around for the lifetime of the Spark application, even when no jobs are running.

The advantage of this model, as previously mentioned, is speed: Tasks can start up very quickly and process in-memory data. The disadvantage is coarser-grained resource management. As the number of executors for an application is typically fixed and each executor has a fixed allotment of resources, an application takes up the same amount of resources for the full duration that it's running. When running on YARN, it's possible to dynamically increase and decrease the number of executors.

Here are the steps that occur when you run a Spark application on a cluster:
  1. Submit an application using spark-submit.
  2. spark-submit launches the driver program and invokes the main() method in the Spark application.
  3. The driver program contacts the cluster manager to ask for resources to launch executors.
  4. The cluster manager launches executors on behalf of the driver program.
  5. The driver process runs through the user application. Based on the RDD actions and transformations in the program, the driver sends work to executors in the form of tasks.
  6. Tasks are run on executor processes to compute and save results.
  7. If the driver's main() method exits or it calls SparkContext.stop(), it will terminate the executors and release resources from the cluster manager.
Table 3. Spark Runtime Summary
Mode YARN Client Mode YARN Cluster Mode Spark Standalone
Driver runs in Client ApplicationMaster Client
Requests resources ApplicationMaster ApplicationMaster Client
Starts executor processes YARN NodeManager YARN NodeManager Spark Worker
Persistent services YARN ResourceManager and NodeManagers YARN ResourceManager and NodeManagers Spark Master and Workers
Supports Spark Shell Yes No Yes

For further information, see Cluster Mode Overview.

Continue reading: