Running Spark Python Applications

Accessing Spark with Java and Scala offers many advantages: platform independence by running inside the JVM, self-contained packaging of code and its dependencies into JAR files, and higher performance because Spark itself runs in the JVM. You lose these advantages when using the Spark Python API.

Managing dependencies and making them available for Python jobs on a cluster can be difficult. To determine which dependencies are required on the cluster, you must understand that Spark code applications run in Spark executor processes distributed throughout the cluster. If the Python transformations you define use any third-party libraries, such as NumPy or nltk, Spark executors require access to those libraries when they run on remote executors.

Self-Contained Dependencies

In a common situation, a custom Python package contains functionality you want to apply to each element of an RDD. The following shows a simple example:

def import_my_special_package(x):
  import my.special.package
  return x

int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x))
int_rdd.collect()

You create a simple RDD of four elements and call it int_rdd. Then you apply the function import_my_special_package to every element of the int_rdd. This function imports my.special.package and then returns the original argument passed to it. This has the same effect as using classes or functions defined in my.special.package because Spark requires that each Spark executor import my.special.package when needed.

If you only need a single file inside my.special.package, you can direct Spark to make this available to all executors by using the --py-files option in your spark-submit command and specifying the local path to the file. You can also specify this programmatically by using the sc.addPyFiles() function. If you use functionality from a package that spans multiple files, make an egg for the package, because the --py-files flag also accepts a path to an egg file.

If you have a self-contained dependency, you can make the required Python dependency available to your executors in two ways:
  • If you depend on only a single file, you can use the --py-files command-line option, or programmatically add the file to the SparkContext with sc.addPyFiles(path) and specify the local path to that Python file.
  • If you have a dependency on a self-contained module (a module with no other dependencies), you can create an egg or zip file of that module and use either the --py-files command-line option or programmatically add the module to theSparkContext with sc.addPyFiles(path) and specify the local path to that egg or zip file.

Complex Dependencies

Some operations rely on complex packages that also have many dependencies. For example, the following code snippet imports the Python pandas data analysis library:

def import_pandas(x):
 import pandas
 return x

int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_pandas(x))
int_rdd.collect()

pandas depends on NumPy, SciPy, and many other packages. Although pandas is too complex to distribute as a *.py file, you can create an egg for it and its dependencies and send that to executors.

Limitations of Distributing Egg Files

In both self-contained and complex dependency scenarios, sending egg files is problematic because packages that contain native code must be compiled for the specific host on which it will run. When doing distributed computing with industry-standard hardware, you must assume is that the hardware is heterogeneous. However, because of the required C compilation, a Python egg built on a client host is specific to the client CPU architecture. Therefore, distributing an egg for complex, compiled packages like NumPy, SciPy, and pandas often fails. Instead of distributing egg files you should install the required Python packages on each host of the cluster and specify the path to the Python binaries for the worker hosts to use.

Installing and Maintaining Python Environments

Installing and maintaining Python environments is complex but allows you to use the full Python package ecosystem. Ideally, a sysadmin sets up a virtual environment or installs the Anaconda distribution on every host of your cluster with your required dependencies.

To quickly set up Python environment yourself, set up a virtual environment on your cluster by running commands on each host using Cluster SSH, Parallel SSH, or Fabric. Assuming each host has Python and pip installed, use the following commands to set up the standard data stack (NumPy, SciPy, scikit-learn, and pandas) in a virtual environment on a RHEL 6-compatible system:

# Install python-devel:
yum install python-devel

# Install non-Python dependencies required by SciPy that are not installed by default:
yum install atlas atlas-devel lapack-devel blas-devel

# install virtualenv:
pip install virtualenv

# create a new virtualenv:
virtualenv mynewenv

# activate the virtualenv:
source mynewenv/bin/activate

# install packages in mynewenv:
pip install numpy
pip install scipy
pip install scikit-learn
pip install pandas

Setting the Python Path

After the Python packages you want to use are in a consistent location on your cluster, set the appropriate environment variables to the path to your Python executables as follows:

  • Client mode: Set the executor path with PYSPARK_PYTHON and the driver path with PYSPARK_DRIVER_PYTHON
  • Cluster mode: Set the executor path with spark.yarn.appMasterEnv.PYSPARK_PYTHON and the driver path with spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON

To consistently use these variables, add export variable /path/to/mynewenv/bin/python to spark-env.sh, checking that other users have not set the variables already with the conditional tests such as:

if [ -z "${PYSPARK_PYTHON}" ]; then
export PYSPARK_PYTHON=
fi
In Cloudera Manager, set environment variables as follows:
  1. Go to the Spark service.
  2. Click the Configuration tab.
  3. Search for Spark Service Advanced Configuration Snippet (Safety Valve) for spark-conf/spark-env.sh.
  4. Add the variables to the property.
  5. Click Save Changes to commit the changes.
  6. Restart the service.
  7. Redeploy client configurations.

On the command-line, set environment variables in /etc/spark/conf/spark-env.sh.