Using Spark Streaming

Spark Streaming is an extension of core Spark that enables scalable, high-throughput, fault-tolerant processing of data streams. Spark Streaming receives input data streams and divides the data into batches called DStreams. DStreams can be created either from sources such as Kafka, Flume, and Kinesis, or by applying operations on other DStreams. Every input DStream is associated with a Receiver, which receives the data from a source and stores it in executor memory.

For detailed information on Spark Streaming, see Spark Streaming Programming Guide.

Spark Streaming and Dynamic Allocation

Starting with CDH 5.5, dynamic allocation is enabled by default, which means that executors are removed when idle. However, dynamic allocation is not effective in Spark Streaming. In Spark Streaming, data comes in every batch, and executors run whenever data is available. If the executor idle timeout is less than the batch duration, executors are constantly being added and removed. However, if the executor idle timeout is greater than the batch duration, executors are never removed. Therefore, Cloudera recommends that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.

Spark Streaming Example

This example uses Kafka to deliver a stream of words to a Python word count program.

  1. Install Kafka and create a Kafka service.
  2. Create a Kafka topic wordcounttopic and pass in your ZooKeeper server:
    $ kafka-topics --create --zookeeper zookeeper_server:2181 --topic wordcounttopic \
    --partitions 1 --replication-factor 1
  3. Create a Kafka word count Python program adapted from the Spark Streaming example kafka_wordcount.py. This version divides the input stream into batches of 10 seconds and counts the words in each batch:
    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
            exit(-1)
    
        sc = SparkContext(appName="PythonStreamingKafkaWordCount")
        ssc = StreamingContext(sc, 10)
    
        zkQuorum, topic = sys.argv[1:]
        kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
        lines = kvs.map(lambda x: x[1])
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
        counts.pprint()
    
        ssc.start()
        ssc.awaitTermination()
  4. Submit the application using spark-submit with dynamic allocation disabled and pass in your ZooKeeper server and topic wordcounttopic. To run locally, you must specify at least two worker threads: one to receive and one to process data.
    $ spark-submit --master local[2] --conf "spark.dynamicAllocation.enabled=false" \
    --jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py \
    zookeeper_server:2181 wordcounttopic

    In a CDH deployment, SPARK_HOME defaults to /usr/lib/spark in package installations and /opt/cloudera/parcels/CDH/lib/spark in parcel installations. In a Cloudera Manager deployment, the shells are also available from /usr/bin.

    Alternatively, you can run on YARN as follows:
    $ spark-submit --master yarn --deploy-mode client --conf "spark.dynamicAllocation.enabled=false" \
    --jars SPARK_HOME/lib/spark-examples.jar kafka_wordcount.py \
    zookeeper_server:2181 wordcounttopic
  5. In another window, start a Kafka producer that publishes to wordcounttopic:
    $ kafka-console-producer --broker-list kafka_broker:9092 --topic wordcounttopic
  6. In the producer window, type the following:
    hello
    hello
    hello
    hello
    hello
    hello
    gb
    gb
    gb
    gb
    gb
    gb
    Depending on how fast you type, in the Spark Streaming application window you will see output like:
    -------------------------------------------
    Time: 2016-01-06 14:18:00
    -------------------------------------------
    (u'hello', 6)
    (u'gb', 2)
    
    -------------------------------------------
    Time: 2016-01-06 14:18:10
    -------------------------------------------
    (u'gb', 4)

Enabling Fault-Tolerant Processing in Spark Streaming

If the driver host for a Spark Streaming application fails, it can lose data that has been received but not yet processed. To ensure that no data is lost, you can use Spark Streaming recovery. Spark writes incoming data to HDFS as it is received and uses this data to recover state if a failure occurs.

To enable Spark Streaming recovery:

  1. Set the spark.streaming.receiver.writeAheadLog.enable parameter to true in the SparkConf object.
  2. Create a StreamingContext instance using this SparkConf, and specify a checkpoint directory.
  3. Use the getOrCreate method in StreamingContext to either create a new context or recover from an old context from the checkpoint directory:
    from __future__ import print_function
    
    import sys
    
    from pyspark import SparkContext, SparkConf
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    checkpoint = "hdfs://ns1/user/systest/checkpoint"
    
    # Function to create and setup a new StreamingContext
    def functionToCreateContext():
    
      sparkConf = SparkConf()
      sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
      sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf)
      ssc = StreamingContext(sc, 10)
    
      zkQuorum, topic = sys.argv[1:]
      kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
      lines = kvs.map(lambda x: x[1])
      counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
      counts.pprint()
    
      ssc.checkpoint(checkpoint)   # set checkpoint directory
      return ssc
    
    if __name__ == "__main__":
      if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    
      ssc = StreamingContext.getOrCreate(checkpoint, lambda: functionToCreateContext())
      ssc.start()
      ssc.awaitTermination()
    
    

For more information, see Checkpointing.

To prevent data loss if a receiver fails, receivers must be able to replay data from the original data sources if required.

  • The Kafka receiver automatically replays if the spark.streaming.receiver.writeAheadLog.enable parameter is set to true.
  • The receiverless Direct Kafka DStream does not require the spark.streaming.receiver.writeAheadLog.enable parameter and can function without data loss, even without Streaming recovery.
  • Both Flume receivers packaged with Spark replay the data automatically on receiver failure.

For more information, see Spark Streaming + Kafka Integration Guide and Spark Streaming + Flume Integration Guide.

Configuring Authentication for Long-Running Spark Streaming Jobs

If you are using authenticated Spark communication, you must perform additional configuration steps for long-running Spark Streaming jobs. See Configuring Spark on YARN for Long-Running Applications.

Best Practices for Spark Streaming in the Cloud

When using Spark Streaming with a cloud service as the underlying storage layer, use ephemeral HDFS on the cluster to store the checkpoints, instead of the cloud store such as Amazon S3 or Microsoft ADLS.