Using Spark SQL

SQLContext and HiveContext

The entry point to all Spark SQL functionality is the SQLContext class or one of its descendants. You create a SQLContext from a SparkContext. With an SQLContext, you can create a DataFrame from an RDD, a Hive table, or a data source.

To work with data stored in Hive or Impala tables from Spark applications, construct a HiveContext, which inherits from SQLContext. With a HiveContext, you can access Hive or Impala tables represented in the metastore database.

If you use spark-shell, a HiveContext is already created for you and is available as the sqlContext variable.

If you use spark-submit, use code like the following at the start of the program:

Python:

from pyspark import SparkContext, HiveContext
sc = SparkContext(appName = "test")
sqlContext = HiveContext(sc)

The host from which the Spark application is submitted or on which spark-shell or pyspark runs must have a Hive gateway role defined in Cloudera Manager and client configurations deployed.

When a Spark job accesses a Hive view, Spark must have privileges to read the data files in the underlying Hive tables. Currently, Spark cannot use fine-grained privileges based on the columns or the WHERE clause in the view definition. If Spark does not have the required privileges on the underlying data files, a SparkSQL query against the view returns an empty result set, rather than an error.

Querying Files Into a DataFrame

If you have data files that are outside of a Hive or Impala table, you can use SQL to directly read JSON or Parquet files into a DataFrame:

  • JSON:
    df = sqlContext.sql("SELECT * FROM json.`input dir`")
  • Parquet:
    df = sqlContext.sql("SELECT * FROM parquet.`input dir`")

See Running SQL on Files.

Spark SQL Example

This example demonstrates how to use sqlContext.sql to create and load two tables and select rows from the tables into two DataFrames. The next steps use the DataFrame API to filter the rows for salaries greater than 150,000 from one of the tables and shows the resulting DataFrame. Then the two DataFrames are joined to create a third DataFrame. Finally the new DataFrame is saved to a Hive table.

  1. At the command line, copy the Hue sample_07 and sample_08 CSV files to HDFS:
    $ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_07.csv /user/hdfs
    $ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_08.csv /user/hdfs
    where HUE_HOME defaults to /opt/cloudera/parcels/CDH/lib/hue (parcel installation) or /usr/lib/hue (package installation).
  2. Start spark-shell:
    $ spark-shell
  3. Create Hive tables sample_07 and sample_08:
    scala> sqlContext.sql("CREATE TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
    scala> sqlContext.sql("CREATE TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
  4. In Beeline, show the Hive tables:
    [0: jdbc:hive2://hostname.com:> show tables;
    +------------+--+
    |  tab_name  |
    +------------+--+
    | sample_07  |
    | sample_08  |
    +------------+--+
  5. Load the data in the CSV files into the tables:
    scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_07.csv' OVERWRITE INTO TABLE sample_07")
    scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_08.csv' OVERWRITE INTO TABLE sample_08")
  6. Create DataFrames containing the contents of the sample_07 and sample_08 tables:
    scala> val df_07 = sqlContext.sql("SELECT * from sample_07")
    scala> val df_08 = sqlContext.sql("SELECT * from sample_08")
  7. Show all rows in df_07 with salary greater than 150,000:
    scala> df_07.filter(df_07("salary") > 150000).show()
    The output should be:
    +-------+--------------------+---------+------+
    |   code|         description|total_emp|salary|
    +-------+--------------------+---------+------+
    |11-1011|    Chief executives|   299160|151370|
    |29-1022|Oral and maxillof...|     5040|178440|
    |29-1023|       Orthodontists|     5350|185340|
    |29-1024|     Prosthodontists|      380|169360|
    |29-1061|   Anesthesiologists|    31030|192780|
    |29-1062|Family and genera...|   113250|153640|
    |29-1063| Internists, general|    46260|167270|
    |29-1064|Obstetricians and...|    21340|183600|
    |29-1067|            Surgeons|    50260|191410|
    |29-1069|Physicians and su...|   237400|155150|
    +-------+--------------------+---------+------+
  8. Create the DataFrame df_09 by joining df_07 and df_08, retaining only the code and description columns.
    scala> val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"),df_07.col("description"))
    scala> df_09.show()
    The new DataFrame looks like:
    +-------+--------------------+
    |   code|         description|
    +-------+--------------------+
    |00-0000|     All Occupations|
    |11-0000|Management occupa...|
    |11-1011|    Chief executives|
    |11-1021|General and opera...|
    |11-1031|         Legislators|
    |11-2011|Advertising and p...|
    |11-2021|  Marketing managers|
    |11-2022|      Sales managers|
    |11-2031|Public relations ...|
    |11-3011|Administrative se...|
    |11-3021|Computer and info...|
    |11-3031|  Financial managers|
    |11-3041|Compensation and ...|
    |11-3042|Training and deve...|
    |11-3049|Human resources m...|
    |11-3051|Industrial produc...|
    |11-3061| Purchasing managers|
    |11-3071|Transportation, s...|
    |11-9011|Farm, ranch, and ...|
    |11-9012|Farmers and ranchers|
    +-------+--------------------+
  9. Save DataFrame df_09 as the Hive table sample_09:
    scala> df_09.write.saveAsTable("sample_09")
  10. In Beeline, show the Hive tables:
    [0: jdbc:hive2://hostname.com:> show tables;
    +------------+--+
    |  tab_name  |
    +------------+--+
    | sample_07  |
    | sample_08  |
    | sample_09  |
    +------------+--+

The equivalent program in Python, that you could submit using spark-submit, would be:

from pyspark import SparkContext, SparkConf, HiveContext

if __name__ == "__main__":

  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Data Frame Join")
  sc = SparkContext(conf=conf)
  sqlContext = HiveContext(sc)
  df_07 = sqlContext.sql("SELECT * from sample_07")
  df_07.filter(df_07.salary > 150000).show()
  df_08 = sqlContext.sql("SELECT * from sample_08")
  tbls = sqlContext.sql("show tables")
  tbls.show()
  df_09 = df_07.join(df_08, df_07.code == df_08.code).select(df_07.code,df_07.description)
  df_09.show()
  df_09.write.saveAsTable("sample_09")
  tbls = sqlContext.sql("show tables")
  tbls.show()

Instead of displaying the tables using Beeline, the show tables query is run using the Spark SQL API.

Ensuring HiveContext Enforces Secure Access

To ensure that HiveContext enforces ACLs, enable the HDFS-Sentry plug-in as described in Synchronizing HDFS ACLs and Sentry Permissions . Column-level access control for access from Spark SQL is not supported by the HDFS-Sentry plug-in.

Interaction with Hive Views

When a Spark job accesses a Hive view, Spark must have privileges to read the data files in the underlying Hive tables. Currently, Spark cannot use fine-grained privileges based on the columns or the WHERE clause in the view definition. If Spark does not have the required privileges on the underlying data files, a SparkSQL query against the view returns an empty result set, rather than an error.

Performance and Storage Considerations for Spark SQL DROP TABLE PURGE

The PURGE clause in the Hive DROP TABLE statement causes the underlying data files to be removed immediately, without being transferred into a temporary holding area (the HDFS trashcan).

Although the PURGE clause is recognized by the Spark SQL DROP TABLE statement, this clause is currently not passed along to the Hive statement that performs the "drop table" operation behind the scenes. Therefore, if you know the PURGE behavior is important in your application for performance, storage, or security reasons, do the DROP TABLE directly in Hive, for example through the beeline shell, rather than through Spark SQL.

The immediate deletion aspect of the PURGE clause could be significant in cases such as:

  • If the cluster is running low on storage space and it is important to free space immediately, rather than waiting for the HDFS trashcan to be periodically emptied.

  • If the underlying data files reside on the Amazon S3 filesystem. Moving files to the HDFS trashcan from S3 involves physically copying the files, meaning that the default DROP TABLE behavior on S3 involves significant performance overhead.

  • If the underlying data files contain sensitive information and it is important to remove them entirely, rather than leaving them to be cleaned up by the periodic emptying of the trashcan.

  • If restrictions on HDFS encryption zones prevent files from being moved to the HDFS trashcan. This restriction primarily applies to CDH 5.7 and lower. With CDH 5.8 and higher, each HDFS encryption zone has its own HDFS trashcan, so the normal DROP TABLE behavior works correctly without the PURGE clause.

TIMESTAMP Compatibility for Parquet Files

Impala stores and retrieves the TIMESTAMP values verbatim, with no adjustment for the time zone. When writing Parquet files, Hive and Spark SQL both normalize all TIMESTAMP values to the UTC time zone. During a query, Spark SQL assumes that all TIMESTAMP values have been normalized this way and reflect dates and times in the UTC time zone. Therefore, Spark SQL adjusts the retrieved date/time values to reflect the local time zone of the server. SPARK-12297 introduces a configuration setting, spark.sql.parquet.int96TimestampConversion=true, that you can set to change the interpretation of TIMESTAMP values read from Parquet files that were written by Impala, to match the Impala behavior.

The following sequence of examples show how, by default, TIMESTAMP values written to a Parquet table by an Apache Impala SQL statement are interpreted differently when queried by Spark SQL, and vice versa.

The initial Parquet table is created by Impala, and some TIMESTAMP values are written to it by Impala, representing midnight of one day, noon of another day, and an early afternoon time from the Pacific Daylight Savings time zone. (The second and third tables are created with the same structure and file format, for use in subsequent examples.)

[localhost:21000] > create table parquet_table(t timestamp) stored as parquet;
[localhost:21000] > create table parquet_table2 like parquet_table stored as parquet;
[localhost:21000] > create table parquet_table3 like parquet_table stored as parquet;
[localhost:21000] > select now();
+-------------------------------+
| now()                         |
+-------------------------------+
| 2018-03-23 14:07:01.057912000 |
+-------------------------------+
[localhost:21000] > insert into parquet_table
                  > values ('2018-03-23'), (now()), ('2000-01-01 12:00:00');
[localhost:21000] > select t from parquet_table order by t;
+-------------------------------+
| t                             |
+-------------------------------+
| 2000-01-01 12:00:00           |
| 2018-03-23 00:00:00           |
| 2018-03-23 14:08:54.617197000 |
+-------------------------------+

By default, when this table is queried through the Spark SQL using spark-shell, the values are interpreted and displayed differently. The time values differ from the Impala result set by either 4 or 5 hours, depending on whether the dates are during the Daylight Savings period or not.

scala> sqlContext.sql("select t from jdr.parquet_table order by t").show(truncate=false);
+--------------------------+
|t                         |
+--------------------------+
|2000-01-01 04:00:00.0     |
|2018-03-22 17:00:00.0     |
|2018-03-23 07:08:54.617197|
+--------------------------+

Running the same Spark SQL query with the configuration setting spark.sql.parquet.int96TimestampConversion=true applied makes the results the same as from Impala:

$ spark-shell --conf spark.sql.parquet.int96TimestampConversion=true
...
scala> sqlContext.sql("select t from jdr.parquet_table order by t").show(truncate=false);
+--------------------------+
|t                         |
+--------------------------+
|2000-01-01 12:00:00.0     |
|2018-03-23 00:00:00.0     |
|2018-03-23 14:08:54.617197|
+--------------------------+

The compatibility considerations also apply in the reverse direction. The following examples show the same Parquet values as before, this time being written to tables through Spark SQL.

$ spark-shell
scala> sqlContext.sql("insert into jdr.parquet_table2 select t from jdr.parquet_table");
scala> sqlContext.sql("select t from jdr.parquet_table2 order by t").show(truncate=false);
+--------------------------+
|t                         |
+--------------------------+
|2000-01-01 04:00:00.0     |
|2018-03-22 17:00:00.0     |
|2018-03-23 07:08:54.617197|
+--------------------------+

Again, the configuration setting spark.sql.parquet.int96TimestampConversion=true means that the values are both read and written in a way that is interoperable with Impala:

$ spark-shell --conf spark.sql.parquet.int96TimestampConversion=true
...
scala> sqlContext.sql("insert into jdr.parquet_table3 select t from jdr.parquet_table");
scala> sqlContext.sql("select t from jdr.parquet_table3 order by t").show(truncate=false);
+--------------------------+
|t                         |
+--------------------------+
|2000-01-01 12:00:00.0     |
|2018-03-23 00:00:00.0     |
|2018-03-23 14:08:54.617197|
+--------------------------+