Developing and Running a Spark WordCount Application

This tutorial describes how to write, compile, and run a simple Spark word count application in two of the languages supported by Spark: Scala and Python. The Scala code was originally developed for a Cloudera tutorial written by Sandy Ryza.

Writing the Application

The example application is an enhanced version of WordCount, the canonical MapReduce example. In this version of WordCount, the goal is to learn the distribution of letters in the most popular words in a corpus. The application:

  1. Creates a SparkConf and SparkContext. A Spark application corresponds to an instance of the SparkContext class. When running a shell, the SparkContext is created for you.
  2. Gets a word frequency threshold.
  3. Reads an input set of text documents.
  4. Counts the number of times each word appears.
  5. Filters out all words that appear fewer times than the threshold.
  6. For the remaining words, counts the number of times each letter occurs.

In MapReduce, this requires two MapReduce applications, as well as persisting the intermediate data to HDFS between them. In Spark, this application requires about 90 percent fewer lines of code than one developed using the MapReduce API.

Here are two versions of the program:
Scala WordCount
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
  def main(args: Array[String]) {
    // create Spark context with Spark configuration
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

    // get threshold
    val threshold = args(1).toInt

    // read in text file and split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

    // count the occurrence of each word
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

    // filter out words with fewer than threshold occurrences
    val filtered = wordCounts.filter(_._2 >= threshold)

    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

    System.out.println(charCounts.collect().mkString(", "))
  }
}
Python WordCount
import sys

from pyspark import SparkContext, SparkConf

if __name__ == "__main__":

  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Spark Count")
  sc = SparkContext(conf=conf)

  # get threshold
  threshold = int(sys.argv[2])

  # read in text file and split each document into words
  tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" "))

  # count the occurrence of each word
  wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  # filter out words with fewer than threshold occurrences
  filtered = wordCounts.filter(lambda pair:pair[1] >= threshold)

  # count characters
  charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  list = charCounts.collect()
  print repr(list)[1:-1]

Compiling and Packaging Scala Applications

The tutorial uses Maven to compile and package the programs. Excerpts of the tutorial pom.xml are included below. For best practices using Maven to build Spark applications, see Building Spark Applications.

To compile Scala, include the Scala tools plug-in:

<plugin>
  <groupId>org.scala-tools</groupId>
      <xrefrtifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
</plugin>

which requires the scala-tools plug-in repository:

<pluginRepositories>
<pluginRepository>
    <id>scala-tools.org</id>
    <name>Scala-tools Maven2 Repository</name>
    <url>http://scala-tools.org/repo-releases</url>
  </pluginRepository>
</pluginRepositories>

Also, include Scala and Spark as dependencies:

<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <xrefrtifactId>scala-library</artifactId>
    <version>2.11.12</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <xrefrtifactId>spark-core_2.11</artifactId>
    <version>2.2.0-cdh6.0.0-beta1</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

To generate the application JAR, run:

$ mvn package

to create sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar in the target directory.

Running the Application

  1. The input to the application is a large text file in which each line contains all the words in a document, stripped of punctuation. Put an input file in a directory on HDFS. You can use tutorial example input file:
    $ wget --no-check-certificate .../inputfile.txt
    $ hdfs dfs -put inputfile.txt
  2. Run one of the applications using spark-submit:
    • Scala - Run in a local process with threshold 2:
      $ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \
        --master local --deploy-mode client --executor-memory 1g \
        --name wordcount --conf "spark.app.id=wordcount" \
        sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar \
        hdfs://namenode_host:8020/path/to/inputfile.txt 2

      If you use the example input file, the output should look something like:

      (e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
    • Python - Run on YARN with threshold 2:
      $ spark-submit --master yarn --deploy-mode client --executor-memory 1g \
        --name wordcount --conf "spark.app.id=wordcount" wordcount.py \
        hdfs://namenode_host:8020/path/to/inputfile.txt 2

      In this case, the output should look something like:

      [(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]