This is the documentation for Cloudera 5.4.x. Documentation for other versions is available at Cloudera Documentation.

Configuring and Running Spark

Configuring Spark in Standalone Mode

  Note:

As of CDH5, Cloudera recommends running Spark Applications on YARN, rather than in standalone mode. Cloudera does not support running Spark applications on Mesos.

Before you can run Spark in standalone mode, you must do the following on every host in the cluster:
  • Edit the following portion of /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs:
    ###
    ### === IMPORTANT ===
    ### Change the following to specify a real cluster's Master host
    ###
    export STANDALONE_SPARK_MASTER_HOST=`hostname`
    Change 'hostname' in the last line to the actual hostname of the host where the Spark Master will run.

You can change other elements of the default configuration by modifying /etc/spark/conf/spark-env.sh. You can change the following:

  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Configuring the Spark History Server

Before you can run the Spark History Server, you must create the /user/spark/applicationHistory/ directory in HDFS and set ownership and permissions as follows:
$ sudo -u hdfs hadoop fs -mkdir /user/spark
$ sudo -u hdfs hadoop fs -mkdir /user/spark/applicationHistory 
$ sudo -u hdfs hadoop fs -chown -R spark:spark /user/spark
$ sudo -u hdfs hadoop fs -chmod 1777 /user/spark/applicationHistory
On Spark clients (systems from which you intend to launch Spark jobs), do the following:
  1. Create /etc/spark/conf/spark-defaults.conf on the Spark client:
    cp /etc/spark/conf/spark-defaults.conf.template /etc/spark/conf/spark-defaults.conf
  2. Add the following to /etc/spark/conf/spark-defaults.conf:
    spark.eventLog.dir=/user/spark/applicationHistory
    spark.eventLog.enabled=true
This causes Spark applications running on this client to write their history to the directory that the history server reads.

In addition, if you want the YARN ResourceManager to link directly to the Spark History Server, you can set the spark.yarn.historyServer.address property in /etc/spark/conf/spark-defaults.conf:

spark.yarn.historyServer.address=http://HISTORY_HOST:HISTORY_PORT

For instructions for configuring the History Server to use Kerberos, see Spark Authentication.

Starting, Stopping, and Running Spark in Standalone Mode

This section provides instructions for running Spark in standalone mode; to run Spark application on YARN, see Running Spark Applications on YARN.

  Note:

As of CDH5, Cloudera recommends running Spark Applications on YARN, rather than in standalone mode. Cloudera does not support running Spark applications on Mesos.

  • To start Spark, proceed as follows:
    • On one node in the cluster, start the Spark Master:
      $ sudo service spark-master start
      
  • On all the other nodes, start the workers:
    $ sudo service spark-worker start
    
  • On one node, start the History Server:
    $ sudo service spark-history-server start
  • To stop Spark, use the following commands on the appropriate hosts:
    $ sudo service spark-worker stop
    $ sudo service spark-master stop
    $ sudo service spark-history-server stop

Service logs are stored in /var/log/spark.

You can use the GUI for the Spark Master at <master_host>:18080.

Testing the Spark Service

To test the Spark service, start spark-shell on one of the hosts. You can, for example, run a word count application:

val file = sc.textFile("hdfs://namenode:8020/path/to/input")
val counts = file.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://namenode:8020/output")

You can see the application by going to the Spark Master UI, by default at http://spark-master:18080, to see the Spark Shell application, its executors and logs.

You can also use the standard SparkPi examples to test your deployment; see Running Spark Applications.

Running Spark Applications

For instructions for running Spark applications in the YARN Client and Cluster modes, see Running Spark Applications and Running Spark Applications on YARN.

Enabling Fault-Tolerant Processing in Spark Streaming

If the driver node for a Spark Streaming application fails, it can lose data that has been received, but not yet processed. To ensure that no data is lost, Spark can write out incoming data to HDFS as it is received and use this data to recover state in the event of a failure. This feature, called Spark Streaming recovery, is introduced in CDH 5.3 as a Beta feature. Spark Streaming recovery is not supported for production use in CDH 5.3.

  1. To enable Spark Streaming recovery, set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the SparkConf object used to instantiate the StreamingContext
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
  2. Next, create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
  3. Finally use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from the checkpoint directory. The following example shows steps 2 and 3 of this procedure.
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
        val conf =  new SparkConf()
        sparkConf.set( "spark.streaming.receiver.writeAheadLog.enable",  "true ")
        val ssc =  new StreamingContext(sparkConf,...)   // new  context
        val kafkaStream = KafkaUtils.createStream(...)
        // Do some transformations on the stream....and write it out etc.
        ssc.checkpoint(checkpointDirectory)    // set checkpoint directory
        ssc
    }
    
     // Get StreamingContext from checkpoint data or create a new  one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
To prevent data loss if a receiver fails, the receivers used must be able to replay data from the original data sources if required.
  • The Kafka receiver will automatically replay if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true.
  • The receiver-less Direct Kafka DStream does not require the spark.streaming.receiver.writeAheadLog.enable parameter, and can function without data loss even without Streaming recovery.
  • Both the Flume receivers that come packaged with Spark also replay the data automatically on receiver failure.