Using Spark 2 from Scala

This topic describes how to set up a Scala project for Cloudera Distribution of Apache Spark 2 along with a few associated tasks. Cloudera Data Science Workbench provides an interface to the Spark 2 shell (v 2.0+) that works with Scala 2.11.

Setting Up a Scala Project

  1. Open Cloudera Data Science Workbench.
  2. Select New Project.
  3. Enter a Project Name.
  4. Choose whether the Project Visibility is Private or Public.
  5. Under Initial Setup, choose the Template tab.
  6. Select Scala.
  7. Click Create Project. Your new project displays sample files.
  8. Click Open Workbench.
  9. Select Scala engine.
  10. Click Launch Session.

The Scala engine typically takes 30 seconds to become ready. This increased startup time is due to the Scala engine automatically creating a Spark context in Apache YARN.

The examples that follow are included under the Scala templates.

Getting Started and Accessing Spark 2

Unlike PySpark or Sparklyr, you can access a SparkContext assigned to the spark (SparkSession) and sc (SparkContext) objects on console startup, just as when using the Spark shell. By default, the application name will be set to CDSW_sessionID, where sessionId is the id of the session running your Spark code. To customize this, set the spark.app.name property to the desired application name in a spark-defaults.conf file.

Pi.scala is a classic starting point for calculating Pi using Montecarlo Estimation.

This is the full, annotated code sample.

// Calculate pi with Monte Carlo estimation
import scala.math.random

// Make a very large unique set of 1 -> n 
val partitions = 2 
val n = math.min(100000L * partitions, Int.MaxValue).toInt 
val xs = 1 until n 

// Split n into the number of partitions we can use 
val rdd = sc.parallelize(xs, partitions).setName("'N values rdd'")

// Generate a random set of points within a 2x2 square
val sample = rdd.map { i =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  (x, y)
}.setName("'Random points rdd'")

// points w/in the square also w/in the center circle of r=1
val inside = sample
    .filter { case (x, y) => (x * x + y * y < 1) }
    .setName("'Random points inside circle'")

val count = inside.count()
 
// Area(circle)/Area(square) = inside/n => pi=4*inside/n                        
println("Pi is roughly " + 4.0 * count / n)

Key points to note:

  • import scala.math.random

    Importing included packages works just as in the shell, and need only be done once.

  • Spark context (sc).
    You can access a SparkContext assigned to the variable sc on console startup.
    val rdd = sc.parallelize(xs, partitions).setName("'N values rdd'")

Example: Reading Data from HDFS (Wordcount)

Since HDFS is the configured filesystem, Spark jobs on Cloudera Data Science Workbench read from HDFS by default.

The file you use must already exist in HDFS. For example, you might load the jedi_wisdom.txt file using the terminal. Click the Terminal link above your Cloudera Data Science Workbench console and enter the following command:

hdfs dfs -put data/jedi_wisdom.txt /tmp

This example reads data from HDFS.

wordcount.scala:
//count lower bound 
val threshold = 2

// this file must already exist in hdfs, add a 
// local version by dropping into the terminal.
val tokenized = sc.textFile("/tmp/data/jedi_wisdom.txt").flatMap(_.split(" "))

// count the occurrence of each word
val wordCounts = tokenized.map((_ , 1)).reduceByKey(_ + _)

// filter out words with fewer than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)

System.out.println(filtered.collect().mkString(","))
Click Run.

Key points to note:

  • You add files to HDFS by opening the terminal and running the following command:
    hdfs dfs -put data/jedi_wisdom.txt /tmp
  • You access the file from Wordcount scala using the following command:
    sc.textFile("tmp/jedi_wisdom.txt")

Example: Read Files from the Cluster Local Filesystem

Use the following command in the terminal to read text from the local filesystem. The file must exist on all nodes, and the same path for the driver and executors. In this example you are reading the file ebay-xbox.csv.

sc.textFile(“file:///tmp/ebay-xbox.csv”) 

Example: Using External Packages by Adding Jars or Dependencies

External libraries are handled through line magics. Line magics in the Toree kernel are prefixed with %.

Adding Remote Packages

You can use Apache Toree's AddDeps magic to add dependencies from Maven central. You must specify the company name, artifact ID, and version. To resolve any transitive dependencies, you must explicitly specify the --transitive flag.

%AddDeps org.scalaj scalaj-http_2.11 2.3.0
import scalaj.http._ 
val response: HttpResponse[String] = Http("http://www.omdbapi.com/").param("t","crimson tide").asString
response.body
response.code
response.headers
response.cookies

Adding Remote or Local JARs

You can use the AddJars magic to distribute local or remote JARs to the kernel and the cluster. Using the -f option ignores cached JARs and reloads.

%AddJar http://example.com/some_lib.jar -f 
%AddJar file:/path/to/some/lib.jar