Running Spark Applications

You can run Spark applications locally or distributed across a cluster, either by using an interactive shell or by submitting an application. Running Spark applications interactively is commonly performed during the data-exploration phase and for ad hoc analysis.

Because of a limitation in the way Scala compiles code, some applications with nested definitions running in an interactive shell may encounter a Task not serializable exception. Cloudera recommends submitting these applications.

To run applications distributed across a cluster, Spark requires a cluster manager. In CDH 6, Cloudera supports only the YARN cluster manager. When run on YARN, Spark application processes are managed by the YARN ResourceManager and NodeManager roles. Spark Standalone is no longer supported.

In CDH 6, Cloudera only supports running Spark applications on a YARN cluster manager. The Spark Standalone cluster manager is not supported.

For information on monitoring Spark applications, see Monitoring Spark Applications .

Submitting Spark Applications

To submit an application consisting of a Python file or a compiled and packaged Java or Spark JAR, use the spark-submit script.

spark-submit Syntax

spark-submit --option value \
  application jar | python file [application arguments]

Example: Running SparkPi on YARN demonstrates how to run one of the sample applications, SparkPi, packaged with Spark. It computes an approximation to the value of pi.

spark-submit Arguments
Option Description
application jar Path to a JAR file containing a Spark application. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the path can be either a local file or a URL globally visible inside your cluster; see Advanced Dependency Management.
python file Path to a Python file containing a Spark application. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the path can be either a local file or a URL globally visible inside your cluster; see Advanced Dependency Management.
application arguments Arguments to pass to the main method of your application.

spark-submit Options

You specify spark-submit options using the form --optionvalue instead of --option=value. (Use a space instead of an equals sign.)

Option Description
class For Java and Scala applications, the fully qualified classname of the class containing the main method of the application. For example, org.apache.spark.examples.SparkPi.
conf Spark configuration property in key=value format. For values that contain spaces, surround "key=value" with quotes (as shown).
deploy-mode Deployment mode: cluster and client. In cluster mode, the driver runs on worker hosts. In client mode, the driver runs locally as an external client. Use cluster mode with production jobs; client mode is more appropriate for interactive and debugging uses, where you want to see your application output immediately. To see the effect of the deployment mode when running on YARN, see Deployment Modes.

Default: client.

driver-class-path Configuration and classpath entries to pass to the driver. JARs added with --jars are automatically included in the classpath.
driver-cores Number of cores used by the driver in cluster mode.

Default: 1.

driver-memory Maximum heap size (represented as a JVM string; for example 1024m, 2g, and so on) to allocate to the driver. Alternatively, you can use the spark.driver.memory property.
files Comma-separated list of files to be placed in the working directory of each executor. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the path can be either a local file or a URL globally visible inside your cluster; see Advanced Dependency Management.
jars Additional JARs to be loaded in the classpath of drivers and executors in cluster mode or in the executor classpath in client mode. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the path can be either a local file or a URL globally visible inside your cluster; see Advanced Dependency Management.
master The location to run the application.
packages Comma-separated list of Maven coordinates of JARs to include on the driver and executor classpaths. The local Maven, Maven central, and remote repositories specified in repositories are searched in that order. The format for the coordinates is groupId:artifactId:version.
py-files Comma-separated list of .zip, .egg, or .py files to place on PYTHONPATH. For the client deployment mode, the path must point to a local file. For the cluster deployment mode, the path can be either a local file or a URL globally visible inside your cluster; see Advanced Dependency Management.
repositories Comma-separated list of remote repositories to search for the Maven coordinates specified in packages.
Master Values
Master Description
local Run Spark locally with one worker thread (that is, no parallelism).
local[K] Run Spark locally with K worker threads. (Ideally, set this to the number of cores on your host.)
local[*] Run Spark locally with as many worker threads as logical cores on your host.
yarn Run using a YARN cluster manager. The cluster location is determined by HADOOP_CONF_DIR or YARN_CONF_DIR. See Configuring the Environment.

Cluster Execution Overview

Spark orchestrates its operations through the driver program. When the driver program is run, the Spark framework initializes executor processes on the cluster hosts that process your data. The following occurs when you submit a Spark application to a cluster:

  1. The driver is launched and invokes the main method in the Spark application.
  2. The driver requests resources from the cluster manager to launch executors.
  3. The cluster manager launches executors on behalf of the driver program.
  4. The driver runs the application. Based on the transformations and actions in the application, the driver sends tasks to executors.
  5. Tasks are run on executors to compute and save results.
  6. If dynamic allocation is enabled, after executors are idle for a specified period, they are released.
  7. When driver's main method exits or calls SparkContext.stop, it terminates any outstanding executors and releases resources from the cluster manager.

The Spark 2 Job Commands

Although the CDS Powered By Apache Spark parcel used slightly different command names than in Spark 1, so that both versions of Spark could coexist on a CDH 5 cluster, the built-in Spark 2 with CDH 6 uses the original command names pyspark (not pyspark2) and spark-submit (not spark2-submit).

Canary Test for pyspark Command

The following example shows a simple pyspark session that refers to the SparkContext, calls the collect() function which runs a Spark 2 job, and writes data to HDFS. This sequence of operations helps to check if there are obvious configuration issues that prevent Spark jobs from working at all. For the HDFS path for the output directory, substitute a path that exists on your own system.

$ hdfs dfs -mkdir /user/systest/spark
$ pyspark
...
SparkSession available as 'spark'.
>>> strings = ["one","two","three"]
>>> s2 = sc.parallelize(strings)
>>> s3 = s2.map(lambda word: word.upper())
>>> s3.collect()
['ONE', 'TWO', 'THREE']
>>> s3.saveAsTextFile('hdfs:///user/systest/spark/canary_test')
>>> quit()
$ hdfs dfs -ls /user/systest/spark
Found 1 items
drwxr-xr-x   - systest supergroup          0 2016-08-26 14:41 /user/systest/spark/canary_test
$ hdfs dfs -ls /user/systest/spark/canary_test
Found 3 items
-rw-r--r--   3 systest supergroup          0 2016-08-26 14:41 /user/systest/spark/canary_test/_SUCCESS
-rw-r--r--   3 systest supergroup          4 2016-08-26 14:41 /user/systest/spark/canary_test/part-00000
-rw-r--r--   3 systest supergroup         10 2016-08-26 14:41 /user/systest/spark/canary_test/part-00001
$ hdfs dfs -cat /user/systest/spark/canary_test/part-00000
ONE
$ hdfs dfs -cat /user/systest/spark/canary_test/part-00001
TWO
THREE

Fetching Spark 2 Maven Dependencies

The Maven coordinates are a combination of groupId, artifactId and version. The groupId and artifactId are the same as for the upstream Apache Spark project. For example, for spark-core, groupId is org.apache.spark, and artifactId is spark-core_2.11, both the same as the upstream project. The version is different for the Cloudera packaging: see CDH 6 Packaging Information for the exact name depending on which release you are using.

Accessing the Spark 2 History Server

In CDH 6, the Spark 2 history server is available on port 18088, the same port used by the Spark 1 history server in CDH 5. This is a change from port 18089 that was formerly used for the history server with the separate Spark 2 parcel.

If you formerly had both Spark 1.6 and CDS Powered By Apache Spark coexisting on the same cluster, the original CDS Spark 2 service remains on port 18089, but new jobs use the history server of the built-in Spark for CDH 6, and its history server on port 18088.