This is the documentation for Cloudera 5.4.x. Documentation for other versions is available at Cloudera Documentation.

Running Spark Applications

Spark applications are similar to MapReduce “jobs.” Each application is a self-contained computation which runs some user-supplied code to compute a result. As with MapReduce jobs, Spark applications can make use of the resources of multiple nodes. Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: parallelized collections, which take an existing Scala collection and run functions on it in parallel, and Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods.

Each application has a driver process which coordinates its execution. This process can run in the foreground (client mode) or in the background (cluster mode). Client mode is a little simpler, but cluster mode allows you to easily log out after starting a Spark application without terminating the application.

Spark starts executors to perform computations. There may be many executors, distributed across the cluster, depending on the size of the job. After loading some of the executors, Spark attempts to match tasks to executors.

CDH 5.3 introduces a performance optimization (via SPARK-1767), which causes Spark to prefer RDDs which are already cached locally in HDFS. This is important enough that Spark will wait for the executors near these caches to be free for a short time.

Note that Spark does not start executors on nodes with cached data, and there is no further chance to select them during the task-matching phase. This is not a problem for most workloads, since most workloads start executors on most or all nodes in the cluster. However, if you do have problems with the optimization, an alternate API, the constructor DeveloperApi, is provided for writing a Spark application, which explicitly spells out the preferred locations to start executors. See the following example, as well as examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala, for a working example of using this API.
val sparkConf = new SparkConf().setAppName( "SparkHdfsLR")
   val inputPath = args(0)
   val conf = SparkHadoopUtil.get.newConfiguration()
   val sc =  new SparkContext(sparkConf,
       Seq( new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats  for the application.
def  this(config: SparkConf, preferredNodeLocationData: Map[ String, Set[SplitInfo]]) = {
   this.preferredNodeLocationData = preferredNodeLocationData
Spark can run in two modes:
  • Standalone mode:

    In standalone mode, Spark uses a Master daemon which coordinates the efforts of the Workers, which run the executors. Standalone mode is the default, but it cannot be used on secure clusters.

  • YARN mode:

    In YARN mode, the YARN ResourceManager performs the functions of the Spark Master. The functions of the Workers are performed by the YARN NodeManager daemons, which run the executors. YARN mode is slightly more complex to set up, but it supports security, and provides better integration with YARN’s cluster-wide resource management policies.

As of CDH 5, Cloudera recommends running Spark applications on a YARN cluster manager, rather than on a Spark Standalone cluster manager. Using YARN as the Spark cluster manager confers several benefits:
  • YARN allows you to dynamically share and centrally configure the same pool of cluster resources between all frameworks that run on YARN.
  • You can take advantage of all the features of YARN schedulers for categorizing, isolating, and prioritizing workloads.
  • You choose the number of executors to use; in contrast, Spark Standalone requires each application to run an executor on every host in the cluster.
  • YARN is the only Spark cluster manager that supports security. With YARN, Spark can run against Kerberized Hadoop clusters and uses secure authentication between its processes.

Multiple Spark applications can run at once. If you decide to run Spark on YARN, you can decide on an application-by-application basis whether to run in YARN client mode or cluster mode. When you run Spark in client mode, the driver process runs locally; in cluster mode, it runs remotely on an ApplicationMaster.


Some applications that have nested definitions and are run in the Spark shell may encounter a Task not serializable exception, because of a limitation in the way Scala compiles code. Cloudera recommends running such applications in a Spark job

The following sections use a sample application, SparkPi, which is packaged with Spark and computes the value of Pi, to illustrate the three modes.

Configuring Spark Using the Command Line


To use Cloudera Manager to configure Spark, see Using Cloudera Manager to Configure Spark to Run on YARN.

The easiest way to configure Spark using the command line is to use $SPARK_HOME/conf/spark-defaults.conf.

This file contains lines in the form: “key value”. You can create a comment by putting a hash mark ( # ) at the beginning of a line.
  Note: You cannot add comments to the end or middle of a line.
Here is an example of a spark-defaults.conf file:
spark.master     spark://
spark.eventLog.enabled    true
spark.eventLog.dir        hdfs:///user/spark/eventlog
# Set spark executor memory
spark.executor.memory     2g
spark.logConf             true
It is a good idea to put configuration keys that you want to use for every application into spark-defaults.conf. See this page for more information.

The Spark-Submit Script

You can start Spark applications with the spark-submit script, which is installed in your path when you install the spark-core package.
  Note: Spark cannot handle command line options of the form --key=value; use --key value instead. (That is, use a space instead of an equals sign.)

To run spark-submit, you need a compiled Spark application JAR. The following sections use a sample JAR, SparkPi, which is packaged with Spark. It computes an approximation to the value of Pi.

See also Submitting an Application to YARN.

Running SparkPi in Standalone Mode

Supply the --master and --deploy-mode client arguments to run SparkPi in standalone mode:
spark-submit \
--class org.apache.spark.examples.SparkPi \
--deploy-mode client \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
where version is, for example, 2.10-1.1.0-cdh5.2.0.

Arguments that come after the JAR name are supplied to the application. In this case, the argument controls how good we want our approximation to Pi to be.

Running SparkPi in YARN Client Mode

In this case, the command to run SparkPi is as follows:
spark-submit  \
--class org.apache.spark.examples.SparkPi \
--deploy-mode client \    
--master yarn-client \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
where version is, for example, 2.10-1.1.0-cdh5.2.0.

Running SparkPi in YARN Cluster Mode

In this case, the command to run SparkPi is as follows:
spark-submit  \
--class org.apache.spark.examples.SparkPi \    
--deploy-mode cluster \    
--master yarn-cluster \
$SPARK_HOME/examples/lib/spark-examples_version.jar 10
where version is, for example, 2.10-1.1.0-cdh5.2.0.

The command will continue to print out status until the job finishes, or you press control-C. Terminating the spark-submit process in cluster mode does not terminate the Spark application as it does in client mode. To monitor the status of the running application, run yarn application -list.

Building Spark Applications

Best practices when compiling your Spark applications include:
  • Building a single assembly JAR that includes all the dependencies, except those for Spark and Hadoop.
  • Excluding any Spark and Hadoop classes from the assembly JAR, because they are already on the cluster, and part of the runtime classpath. In Maven, you can mark the Spark and Hadoop dependencies as provided.
  • Always building against the same version of Spark that you are running against, to avoid compatibility issues.
    For example, do not assume that applications compiled against Spark 0.9 will run on Spark 1.0 without recompiling. In addition, some applications compiled under Spark 0.9 or earlier will need changes to their source code to compile under Spark 1.0. Applications that compile under Spark 1.0 should compile under all future versions.

    Fat JARs must always be built against the version of Spark you intend to run (see Apache Spark Known Issues).

Using Spark with HBase

A common use case is to use Spark to process data which is destined for HBase, or which has been extracted from HBase. See Importing Data Into HBase.