Using Spark Streaming

Spark Streaming is an extension of core Spark that enables scalable, high-throughput, fault-tolerant processing of data streams. Spark Streaming receives input data streams called Discretized Streams (DStreams), which are essentially a continuous series of RDDs. DStreams can be created either from sources such as Kafka, Flume, and Kinesis, or by applying operations on other DStreams.

For detailed information on Spark Streaming, see Spark Streaming Programming Guide in the Apache Spark documentation.

Spark Streaming and Dynamic Allocation

Starting with CDH 5.5, Dynamic allocation is enabled by default, which means that executors are removed when idle. Dynamic allocation conflicts with Spark Streaming operations.

In Spark Streaming, data comes in batches, and executors run whenever data is available. If the executor idle timeout is less than the batch duration, executors are constantly added and removed. However, if the executor idle timeout is greater than the batch duration, executors are never removed. Therefore, Cloudera recommends that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.

Spark Streaming Example

This example uses Kafka to deliver a stream of words to a Python word count program.

  1. If you have not already done so, add a Kafka service using the instructions in Adding a Service.
  2. Create a Kafka topic wordcounttopic and pass in your ZooKeeper server:
    kafka-topics --create --zookeeper zookeeper_server:2181 --topic wordcounttopic --partitions 1 --replication-factor 1
  3. Create a Kafka word count Python program adapted from the Spark Streaming example kafka_wordcount.py. This version divides the input stream into batches of 10 seconds and counts the words in each batch:
    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
            exit(-1)
    
        sc = SparkContext(appName="PythonStreamingKafkaWordCount")
        ssc = StreamingContext(sc, 10)
    
        zkQuorum, topic = sys.argv[1:]
        kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
        lines = kvs.map(lambda x: x[1])
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
        counts.pprint()
    
        ssc.start()
        ssc.awaitTermination()
  4. Submit the application using spark-submit with dynamic allocation disabled and specifying your ZooKeeper server and topic. To run locally, you must specify at least two worker threads: one to receive and one to process data:
    spark-submit --master local[2] --conf "spark.dynamicAllocation.enabled=false" --jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py zookeeper_server:2181 wordcounttopic

    In a CDH deployment, SPARK_HOME defaults to /usr/lib/spark in package installations and /opt/cloudera/parcels/CDH/lib/spark in parcel installations. In a Cloudera Manager deployment, the shells are also available from /usr/bin.

    Alternatively, you can run on YARN as follows:

    spark-submit --master yarn --deploy-mode client --conf "spark.dynamicAllocation.enabled=false" --jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py zookeeper_server:2181 wordcounttopic
  5. In another window, start a Kafka producer that publishes to wordcounttopic:
    $ kafka-console-producer --broker-list kafka_broker:9092 --topic wordcounttopic
  6. In the producer window, type the following:
    hello
    hello
    hello
    hello
    hello
    hello
    gb
    gb
    gb
    gb
    gb
    gb

    Depending on how fast you type, in the Spark Streaming application window you will see output like:

    -------------------------------------------
    Time: 2016-01-06 14:18:00
    -------------------------------------------
    (u'hello', 6)
    (u'gb', 2)
    
    -------------------------------------------
    Time: 2016-01-06 14:18:10
    -------------------------------------------
    (u'gb', 4)

Enabling Fault-Tolerant Processing in Spark Streaming

For long-running Spark Streaming jobs, make sure to configure the maximum allowed failures in a given time period. For example, to allow 3 failures per hour, set the following parameters (in spark-defaults.conf or when submitting the job):

spark.yarn.maxAppAttempts=3
spark.yarn.am.attemptFailuresValidityInterval=1h

If the driver host for a Spark Streaming application fails, it can lose data that has been received but not yet processed. To ensure that no data is lost, you can use Spark Streaming recovery. Recovery uses a combination of a write-ahead log and checkpoints. Spark writes incoming data to HDFS as it is received and uses this data to recover state if a failure occurs.

To enable Spark Streaming recovery:

  1. Set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the SparkConf object.
  2. Create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
  3. Use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from the checkpoint directory:
    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext, SparkConf
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    checkpoint = "hdfs://ns1/user/systest/checkpoint"
    
    # Function to create and setup a new StreamingContext
    def functionToCreateContext():
    
      sparkConf = SparkConf()
      sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
      sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf)
      ssc = StreamingContext(sc, 10)
    
      zkQuorum, topic = sys.argv[1:]
      kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
      lines = kvs.map(lambda x: x[1])
      counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
      counts.pprint()
    
      ssc.checkpoint(checkpoint)   # set checkpoint directory
      return ssc
    
    if __name__ == "__main__":
      if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    
      ssc = StreamingContext.getOrCreate(checkpoint, lambda: functionToCreateContext())
      ssc.start()
      ssc.awaitTermination()

For more information, see Checkpointing in the Apache Spark documentation.

To prevent data loss if a receiver fails, receivers must be able to replay data from the original data sources if required.

  • The Kafka receiver automatically replays if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true.
  • The receiverless Direct Kafka DStream does not require the spark.streaming.receiver.writeAheadLog.enable parameter and can function without data loss, even without Streaming recovery.
  • Both Flume receivers packaged with Spark replay the data automatically on receiver failure.

For more information, see Spark Streaming + Kafka Integration Guide, Spark Streaming + Flume Integration Guide, and Offset Management For Apache Kafka With Apache Spark Streaming.

Configuring Authentication for Long-Running Spark Streaming Jobs

Long-running applications such as Spark Streaming jobs must be able to write to HDFS, which means that the hdfs user may need to delegate tokens possibly beyond the default lifetime. This workload type requires passing Kerberos principal and keytab to the spark-submit script using the --principal and --keytab parameters. The keytab is copied to the host running the ApplicationMaster, and the Kerberos login is renewed periodically by using the principal and keytab to generate the required delegation tokens needed for HDFS.

Best Practices for Spark Streaming in the Cloud

When using Spark Streaming with a cloud service as the underlying storage layer, use ephemeral HDFS on the cluster to store the checkpoints, instead of the cloud store such as Amazon S3 or Microsoft ADLS.

If you have enabled the write-ahead log with S3 (or any file system that does not support flushing), make sure to enable the following settings:

spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true