Using Spark 2 from Python

Setting Up Your PySpark environment

  1. Open Cloudera Data Science Workbench.
  2. Click New Project.
  3. Enter a Project Name.
  4. Choose whether the Project Visibility is Private or Public.
  5. Under Initial Setup, choose the Template tab.
  6. From the pop-up menu, select PySpark.
  7. Click Create Project. Your new project displays sample files.
  8. Click Open Workbench.
  9. Select the Python 2 engine.
  10. Click Launch Session.

Example: Montecarlo Estimation

Within the template PySpark project, pi.py is a classic example that calculates Pi using the Montecarlo Estimation.

What follows is the full, annotated code sample that can be saved to the pi.py file.

# # Estimating $\pi$
#
# This PySpark example shows you how to estimate $\pi$ in parallel
# using Monte Carlo integration.

from __future__ import print_function
import sys
from random import random
from operator import add
# Connect to Spark by creating a Spark session
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("PythonPi")\
    .getOrCreate()

partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions

def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 < 1 else 0

# To access the associated SparkContext
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))

spark.stop()

Example: Reading Data from HDFS (Wordcount)

In this PySpark example, Cloudera Data Science Workbench reads in data from the default configured filesystem, HDFS.

Wordcount.py
# Word count
# 
# This example shows how to count the occurrences of each word in a text file.
  
from __future__ import print_function
import sys, re
from operator import add
from pyspark.sql import SparkSession
  
spark = SparkSession\
  .builder\
  .appName("PythonWordCount")\
  .getOrCreate()
  
# The file you use as input must already exist in HDFS.
# Add the data file to hdfs.
!hdfs dfs -put resources/cgroup-v2.txt /tmp

# Access the file from wordcount.py  
lines = spark.read.text("/tmp/cgroup-v2.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
  .map(lambda x: (x, 1)) \
  .reduceByKey(add) \
  .sortBy(lambda x: x[1], False)
output = counts.collect()
for (word, count) in output:
  print("%s: %i" % (word, count))

spark.stop()
 

Example: Locating and Adding JARs to Spark 2 Configuration

This example shows how to discover the location of JAR files installed with Spark 2, and add them to the Spark 2 configuration.

# # Using Avro data
# 
# This example shows how to use a JAR file on the local filesystem on
# Spark on Yarn.

from __future__ import print_function
import os,sys
import os.path
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.files import SparkFiles

# Add the data file to HDFS for consumption by the Spark executors.
!hdfs dfs -put resources/users.avro /tmp

# Find the example JARs provided by the Spark parcel. This parcel
# is available on both the driver, which runs in Cloudera Data Science Workbench, and the 
# executors, which run on Yarn.
exampleDir = os.path.join(os.environ["SPARK_HOME"], "examples/jars")
exampleJars = [os.path.join(exampleDir, x) for x in os.listdir(exampleDir)]

# Add the Spark JARs to the Spark configuration to make them available for use.
spark = SparkSession\
    .builder\
    .config("spark.jars", ",".join(exampleJars))\
    .appName("AvroKeyInputFormat")\
    .getOrCreate()
sc = spark.sparkContext

# Read the schema.
schema = open("resources/user.avsc").read()
conf = {"avro.schema.input.key": schema }

avro_rdd = sc.newAPIHadoopFile(
    "/tmp/users.avro", # This is an HDFS path!
    "org.apache.avro.mapreduce.AvroKeyInputFormat",
    "org.apache.avro.mapred.AvroKey",
    "org.apache.hadoop.io.NullWritable",
    keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
    conf=conf)
output = avro_rdd.map(lambda x: x[0]).collect()
for k in output:
    print(k)
spark.stop()

Example: Distributing Dependencies on a PySpark Cluster

Although Python is a popular choice for data scientists, it is not straightforward to make a Python library available on a distributed PySpark cluster. To determine which dependencies are required on the cluster, you must understand that Spark code applications run in Spark executor processes distributed throughout the cluster. If the Python code you are running uses any third-party libraries, Spark executors require access to those libraries when they run on remote executors.

This example demonstrates a way to run the following Python code (nltk_sample.py), that includes pure Python libraries (nltk), on a distributed PySpark cluster.

nltk_sample.py

# This code uses NLTK, a Python natural language processing library.
# NLTK is not installed with conda by default. 
# You can use 'import' within your Python UDFs, to use Python libraries.
# The goal here is to distribute NLTK with the conda environment.

import os
import sys
from pyspark.sql import SparkSession
 
spark = SparkSession.builder \
          .appName("spark-nltk") \
          .getOrCreate()
 
data = spark.sparkContext.textFile('1970-Nixon.txt')
 
def word_tokenize(x):
    import nltk
    return nltk.word_tokenize(x)
 
def pos_tag(x):
    import nltk
    return nltk.pos_tag([x])
 
words = data.flatMap(word_tokenize)
words.saveAsTextFile('nixon_tokens')
 
pos_word = words.map(pos_tag)
pos_word.saveAsTextFile('nixon_token_pos')
  1. Pack the Python environment into conda.
    conda create -n nltk_env --copy -y -q python=2.7.11 nltk numpy
    The --copy option allows you to copy whole dependent packages into certain directory of a conda environment. If you want to add extra pip packages without conda, you should copy packages manually after using pip install. In Cloudera Data Science Workbench, pip will install packages into ~/.local.
    pip install some-awesome-package
    cp -r ~/.local/lib ~/.conda/envs/nltk_env/
    Zip the conda environment for shipping on PySpark cluster.
    cd ~/.conda/envs
    zip -r ../../nltk_env.zip nltk_env
  2. (Specific to NLTK) For this example, you can use NLTK data as input.
    cd ~/
    source activate nltk_env
    
    # download nltk data
    (nltk_env)$ python -m nltk.downloader -d nltk_data all
    (nltk_env)$ hdfs dfs -put nltk_data/corpora/state_union/1970-Nixon.txt ./
      
    # archive nltk data for distribution
    cd ~/nltk_data/tokenizers/
    zip -r ../../tokenizers.zip *
    cd ~/nltk_data/taggers/
    zip -r ../../taggers.zip *
  3. Set spark-submit options in spark-defaults.conf.
    spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python
    spark.yarn.appMasterEnv.NLTK_DATA=./
    spark.executorEnv.NLTK_DATA=./
    spark.yarn.dist.archives=nltk_env.zip#NLTK,tokenizers.zip#tokenizers,taggers.zip#taggers

    With these settings, PySpark unzips nltk_env.zip into the NLTK directory. NLTK_DATA is the environmental variable where NLTK data is stored.

  4. Set the PYSPARK_PYTHON environment variable in Cloudera Data Science Workbench. To set this, go to the project page and click Settings > Engine > Environment Variables. Set PYSPARK_PYTHON to ./NLTK/nltk_env/bin/python and click Save Environment.
  5. Restart your project session and run the nltk_sample.py script in the workbench. You can test whether the script ran successfully using the following command:
    !hdfs dfs -cat ./nixon_tokens/* | head -n 20
    Annual
    Message
    to
    the
    Congress
    on
    the
    State
    of
    the
    Union
    .
    January
    22
    ,
    1970
    Mr.
    Speaker
    ,
    Mr.
      
    ! hdfs dfs -cat nixon_token_pos/* | head -n 20
    [(u'Annual', 'JJ')]
    [(u'Message', 'NN')]
    [(u'to', 'TO')]
    [(u'the', 'DT')]
    [(u'Congress', 'NNP')]
    [(u'on', 'IN')]
    [(u'the', 'DT')]
    [(u'State', 'NNP')]
    [(u'of', 'IN')]
    [(u'the', 'DT')]
    [(u'Union', 'NN')]
    [(u'.', '.')]
    [(u'January', 'NNP')]
    [(u'22', 'CD')]
    [(u',', ',')]
    [(u'1970', 'CD')]
    [(u'Mr.', 'NNP')]
    [(u'Speaker', 'NN')]
    [(u',', ',')]
    [(u'Mr.', 'NNP')]