Using Spark 2 from Scala

This topic describes how to set up a Scala project for CDS 2.x Powered by Apache Spark along with a few associated tasks. Cloudera Data Science Workbench provides an interface to the Spark 2 shell (v 2.0+) that works with Scala 2.11.

Accessing Spark 2 from the Scala Engine

Unlike PySpark or Sparklyr, you can access a SparkContext assigned to the spark (SparkSession) and sc (SparkContext) objects on console startup, just as when using the Spark shell. By default, the application name will be set to CDSW_sessionID, where sessionId is the id of the session running your Spark code. To customize this, set the spark.app.name property to the desired application name in a spark-defaults.conf file.

Pi.scala is a classic starting point for calculating Pi using Montecarlo Estimation.

This is the full, annotated code sample.

//Calculate pi with Monte Carlo estimation
import scala.math.random

//make a very large unique set of 1 -> n 
val partitions = 2 
val n = math.min(100000L * partitions, Int.MaxValue).toInt 
val xs = 1 until n 

//split up n into the number of partitions we can use 
val rdd = sc.parallelize(xs, partitions).setName("'N values rdd'")

//generate a random set of points within a 2x2 square
val sample = rdd.map { i =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  (x, y)
}.setName("'Random points rdd'")

//points w/in the square also w/in the center circle of r=1
val inside = sample.filter { case (x, y) => (x * x + y * y < 1) }.setName("'Random points inside circle'")
val count = inside.count()
 
//Area(circle)/Area(square) = inside/n => pi=4*inside/n                        
println("Pi is roughly " + 4.0 * count / n)

Key points to note:

  • import scala.math.random

    Importing included packages works just as in the shell, and need only be done once.

  • Spark context (sc).
    You can access a SparkContext assigned to the variable sc on console startup.
    val rdd = sc.parallelize(xs, partitions).setName("'N values rdd'")

Example: Read Files from the Cluster Local Filesystem

Use the following command in the terminal to read text from the local filesystem. The file must exist on all nodes, and the same path for the driver and executors. In this example you are reading the file ebay-xbox.csv.

sc.textFile(“file:///tmp/ebay-xbox.csv”) 

Example: Using External Packages by Adding Jars or Dependencies

External libraries are handled through line magics. Line magics in the Toree kernel are prefixed with %.

Adding Remote Packages

You can use Apache Toree's AddDeps magic to add dependencies from Maven central. You must specify the company name, artifact ID, and version. To resolve any transitive dependencies, you must explicitly specify the --transitive flag.

%AddDeps org.scalaj scalaj-http_2.11 2.3.0
import scalaj.http._ 
val response: HttpResponse[String] = Http("http://www.omdbapi.com/").param("t","crimson tide").asString
response.body
response.code
response.headers
response.cookies

Adding Remote or Local JARs

You can use the AddJars magic to distribute local or remote JARs to the kernel and the cluster. Using the -f option ignores cached JARs and reloads.

%AddJar http://example.com/some_lib.jar -f 
%AddJar file:/path/to/some/lib.jar