Importing Data into Cloudera Data Science Workbench

Cloudera Data Science Workbench allows you to run analytics workloads on data imported from local files, Apache HBase, Apache Kudu, Apache Impala, Apache Hive or other external data stores such as Amazon S3.

Accessing Local Data from Your Computer

If you want to perform analytics operations on existing data files (.csv, .txt, etc.) from your computer, you can upload these files directly to your Cloudera Data Science Workbench project. Go to the project's Overview page. Under the Files section, click Upload and select the relevant data files to be uploaded.

The following sections use the tips.csv dataset to demonstrate how to work with local data stored within your project. Upload this dataset to the data folder in your project before you run these examples.

Pandas (Python)

import pandas as pd

tips = pd.read_csv('data/tips.csv')
  
tips \
  .query('sex == "Female"') \
  .groupby('day') \
  .agg({'tip' : 'mean'}) \
  .rename(columns={'tip': 'avg_tip_dinner'}) \
  .sort_values('avg_tip_dinner', ascending=False)

dplyr (R)

library(readr)
library(dplyr)

# load data from .csv file in project
tips <- read_csv("data/tips.csv")

# query using dplyr
tips %>%
  filter(sex == "Female") %>%
  group_by(day) %>%
  summarise(
    avg_tip = mean(tip, na.rm = TRUE)
  ) %>%
  arrange(desc(avg_tip))

Accessing Data from HDFS

There are many ways to access HDFS data from R, Python, and Scala libraries. The following code samples demonstrate how to count the number of occurrences of each word in a simple text file in HDFS.

Navigate to your project and click Open Workbench. Create a file called sample_text_file.txt and save it to your project in the data folder. Now write this file to HDFS. You can do this in one of the following ways:

  • Click Terminal above the Cloudera Data Science Workbench console and enter the following command to write the file to HDFS:
    hdfs dfs -put data/sample_text_file.txt /tmp
    OR
  • Use the workbench command prompt:

    Python Session
    !hdfs dfs -put data/sample_text_file.txt /tmp

    R Session

    system("hdfs dfs -put data/tips.csv /user/hive/warehouse/tips/")

The following examples use Python and Scala to read sample_text_file.txt from HDFS (written above) and perform the count operation on it.

Python

from __future__ import print_function
import sys, re
from operator import add
from pyspark.sql import SparkSession
  
spark = SparkSession\
  .builder\
  .appName("PythonWordCount")\
  .getOrCreate()

# Access the file  
lines = spark.read.text("/tmp/sample_text_file.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
  .map(lambda x: (x, 1)) \
  .reduceByKey(add) \
  .sortBy(lambda x: x[1], False)
output = counts.collect()
for (word, count) in output:
  print("%s: %i" % (word, count))

spark.stop()

Scala

//count lower bound 
val threshold = 2

// read the file added to hdfs
val tokenized = sc.textFile("/tmp/sample_text_file.txt").flatMap(_.split(" "))

// count the occurrence of each word
val wordCounts = tokenized.map((_ , 1)).reduceByKey(_ + _)

// filter out words with fewer than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)

System.out.println(filtered.collect().mkString(","))

Accessing Data from Apache Impala

In this section, we take some sample data in the form of a CSV file, save the contents of this file to a table in Impala, and then use some common Python and R libraries to run simple queries on this data.

  1. Loading CSV Data into an Impala Table
  2. Running Queries on Impala Tables

Loading CSV Data into an Impala Table

For this demonstration, we will be using the tips.csv dataset. Use the following steps to save this file to a project in Cloudera Data Science Workbench, and then load it into a table in Apache Impala.
  1. Create a new Cloudera Data Science Workbench project.
  2. Create a folder called data and upload tips.csv to this folder. For detailed instructions, see Managing Project Files.
  3. The next steps require access to services on the CDH cluster. If Kerberos has been enabled on the cluster, enter your credentials (username, password/keytab) in Cloudera Data Science Workbench to enable access. For instructions, see Hadoop Authentication with Kerberos for Cloudera Data Science Workbench.
  4. Navigate back to the project Overview page and click Open Workbench.
  5. Launch a new session (Python or R).
  6. Open the Terminal.
    1. Run the following command to create an empty table in Impala called tips. Replace <impala_daemon_hostname> with the hostname for your Impala daemon.
      impala-shell -i <impala_daemon_hostname>:21000 -q '
        CREATE TABLE default.tips (
          `total_bill` FLOAT,
          `tip` FLOAT,
          `sex` STRING,
          `smoker` STRING,
          `day` STRING,
          `time` STRING,
          `size` TINYINT)
        ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
        LOCATION "hdfs:///user/hive/warehouse/tips/";'
    2. Run the following command to load data from the /data/tips.csv file into the Impala table.
      hdfs dfs -put data/tips.csv /user/hive/warehouse/tips/

Running Queries on Impala Tables

This section demonstrates how to run queries on the tips table created in the previous section using some common Python and R libraries such as Pandas, Impyla, Sparklyr and so on. You will notice that all the examples in this section run the same query, but use different libraries to do so.

PySpark (Python)

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('yarn').getOrCreate()

# load data from .csv file in HDFS
# tips = spark.read.csv("/user/hive/warehouse/tips/", header=True, inferSchema=True)

# OR load data from table in Hive metastore
tips = spark.table('tips')

from pyspark.sql.functions import col, lit, mean

# query using DataFrame API  
tips \
  .filter(col('sex').like("%Female%")) \
  .groupBy('day') \
  .agg(mean('tip').alias('avg_tip')) \
  .orderBy('avg_tip',ascending=False) \
  .show()

# query using SQL
spark.sql('''
  SELECT day,AVG(tip) AS avg_tip \
  FROM tips \
  WHERE sex LIKE "%Female%" \
  GROUP BY day \
  ORDER BY avg_tip DESC''').show()
  
spark.stop()

Impyla (Python)

# (Required) Install the impyla package
# !pip3 install impyla
import os
import pandas
from impala.dbapi import connect
from impala.util import as_pandas

# Connect to Impala using Impyla
# Secure clusters will require additional parameters to connect to Impala.
# Recommended: Specify IMPALA_HOST as an environment variable in your project settings

IMPALA_HOST = os.getenv('IMPALA_HOST', '<impala_daemon_hostname>')
conn = connect(host=IMPALA_HOST, port=21050)

# Execute using SQL
cursor = conn.cursor()

cursor.execute('SELECT day,AVG(tip) AS avg_tip \
                FROM tips \
                WHERE sex ILIKE "%Female%" \
                GROUP BY day \
                ORDER BY avg_tip DESC')


# Pretty output using Pandas
tables = as_pandas(cursor)
tables

Ibis (Python)

# (Required) Install the ibis-framework[impala] package
# !pip3 install ibis-framework[impala]

import ibis
import os
ibis.options.interactive = True
ibis.options.verbose = True

# Connection to Impala
# Secure clusters will require additional parameters to connect to Impala.
# Recommended: Specify IMPALA_HOST as an environment variable in your project settings

IMPALA_HOST = os.getenv('IMPALA_HOST', '<impala_daemon_hostname>')
con = ibis.impala.connect(host=IMPALA_HOST, port=21050, database='default')
con.list_tables()

tips = con.table('tips')

tips \
  .filter(tips.sex.like(['%Female%'])) \
  .group_by('day') \
  .aggregate( \
     avg_tip=tips.tip.mean() \
  ) \
  .sort_by(ibis.desc('avg_tip')) \
  .execute()

Sparklyr (R)

# (Required) Install the sparklyr package
# install.packages("sparklyr")

library(stringr)
library(sparklyr)
library(dplyr)

spark <- spark_connect(master = "yarn")

# load data from file in HDFS
tips <- spark_read_csv(
  sc = spark,
  name = "tips",
  path = "/user/hive/warehouse/tips/"
)

# OR load data from table 
tips <- tbl(spark, "tips")

# query using dplyr
tips %>%
  filter(sex %like% "%Female%") %>%
  group_by(day) %>%
  summarise(
    avg_tip = mean(tip, na.rm = TRUE)
  ) %>%
  arrange(desc(avg_tip))


# query using SQL
tbl(spark, sql("
  SELECT day,AVG(tip) AS avg_tip \
  FROM tips \
  WHERE sex LIKE '%Female%' \
  GROUP BY day \
  ORDER BY avg_tip DESC"))

spark_disconnect(spark)

Accessing Data from Apache Hive

The following code sample demonstrates how to establish a connection with the Hive metastore and access data from tables in Hive.

Python

import os
import pandas
from impala.dbapi import connect
from impala.util import as_pandas

# Specify HIVE_HMS_HOST as an environment variable in your project settings
HIVE_HMS_HOST = os.getenv('HIVE_HMS_HOST', '<hive_metastore_hostname>')

conn = connect(host=HIVE_HMS_HOST,
               port='10000',
               auth_mechanism='GSSAPI',
               kerberos_service_name='hive')
cursor = conn.cursor()
cursor.execute('SHOW TABLES')
tables = as_pandas(cursor)
tables

Accessing Data in Amazon S3 Buckets

Every language in Cloudera Data Science Workbench has libraries available for uploading to and downloading from Amazon S3.

To work with S3:

  1. Add your Amazon Web Services access keys to your project's environment variables as AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  2. Pick your favorite language from the code samples below. Each one downloads the R 'Old Faithful' dataset from S3.

R

library("devtools") 
install_github("armstrtw/AWS.tools") 

Sys.setenv("AWSACCESSKEY"=Sys.getenv("AWS_ACCESS_KEY_ID")) 
Sys.setenv("AWSSECRETKEY"=Sys.getenv("AWS_SECRET_ACCESS_KEY")) 

library("AWS.tools") 

s3.get("s3://sense-files/faithful.csv")

Python

# Install Boto to the project
!pip install boto

# Create the Boto S3 connection object.
from boto.s3.connection import S3Connection
aws_connection = S3Connection()
        
# Download the dataset to file 'faithful.csv'.
bucket = aws_connection.get_bucket('sense-files')
key = bucket.get_key('faithful.csv')
key.get_contents_to_filename('/home/cdsw/faithful.csv')

Accessing External SQL Databases

Every language in Cloudera Data Science Workbench has multiple client libraries available for SQL databases.

If your database is behind a firewall or on a secure server, you can connect to it by creating an SSH tunnel to the server, then connecting to the database on localhost.

If the database is password-protected, consider storing the password in an environmental variable to avoid displaying it in your code or in consoles. The examples below show how to retrieve the password from an environment variable and use it to connect.

R

# dplyr lets you program the same way with local data frames and remote SQL databases. 
install.packages("dplyr") 
library("dplyr") 
db <- src_postgres(dbname="test_db", host="localhost", port=5432, user="cdswuser", password=Sys.getenv("POSTGRESQL_PASSWORD")) 
flights_table <- tbl(db, "flights") 
select(flights_table, year:day, dep_delay, arr_delay) 

Python

You can access data using pyodbc or SQLAlchemy

# pyodbc lets you make direct SQL queries.
!wget https://pyodbc.googlecode.com/files/pyodbc-3.0.7.zip
!unzip pyodbc-3.0.7.zip
!cd pyodbc-3.0.7;python setup.py install --prefix /home/cdsw
import os

# See http://www.connectionstrings.com/ for information on how to construct ODBC connection strings.
db = pyodbc.connect("DRIVER={PostgreSQL Unicode};SERVER=localhost;PORT=5432;DATABASE=test_db;USER=cdswuser;OPTION=3;PASSWORD=%s" % os.environ["POSTGRESQL_PASSWORD"])
cursor = cnxn.cursor()
cursor.execute("select user_id, user_name from users")

# sqlalchemy is an object relational database client that lets you make database queries in a more Pythonic way.
!pip install sqlalchemy
import os
 
import sqlalchemy
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
db = create_engine("postgresql://cdswuser:%s@localhost:5432/test_db" % os.environ["POSTGRESQL_PASSWORD"])
session = sessionmaker(bind=db)
user = session.query(User).filter_by(name='ed').first()