Running Apache Hive on Spark in CDH

This section explains how to run Hive using the Spark execution engine. It assumes that the cluster is managed by Cloudera Manager.

Configuring Hive on Spark

Minimum Required Role: Configurator (also provided by Cluster Administrator, Full Administrator)

To configure Hive to run on Spark do both of the following steps:

Configuring the Hive Dependency on a Spark Service

By default, if a Spark service is available, the Hive dependency on the Spark service is configured. To change this configuration, do the following:

  1. In the Cloudera Manager Admin Console, go to the Hive service.
  2. Click the Configuration tab.
  3. Search for the Spark On YARN Service. To configure the Spark service, select the Spark service name. To remove the dependency, select none.
  4. Click Save Changes.
  5. Go to the Spark service.
  6. Add a Spark gateway role to the host running HiveServer2.
  7. Return to the Home page by clicking the Cloudera Manager logo.
  8. Click the icon that is next to any stale services to invoke the cluster restart wizard.
  9. Click Restart Stale Services.
  10. Click Restart Now.
  11. Click Finish.
  12. In the Hive client, configure the Spark execution engine.

Configuring Hive on Spark for Performance

For the configuration automatically applied by Cloudera Manager when the Hive on Spark service is added to a cluster, see Hive on Spark Autoconfiguration.

For information on configuring Hive on Spark for performance, see Tuning Apache Hive on Spark in CDH.

Dynamic Partition Pruning for Hive Map Joins

Starting in CDH 5.13, you can enable dynamic partition pruning for map joins when you are running Hive on Spark (HoS). Dynamic partition pruning (DPP) is a database optimization that can significantly decrease the amount of data that a query scans, thereby executing your workloads faster. DPP achieves this by dynamically determining and eliminating the number of partitions that a query must read from a partitioned table.

Map joins also optimize how Hive executes queries. They cause a small table to be scanned and loaded in memory as a hash table so that a fast join can be performed entirely within a mapper without having to use another reduce step. If you have queries that join small tables, map joins can make them execute much faster. Map joins are enabled by default in CDH with the Enable MapJoin Optimization setting for HiveServer2 in Cloudera Manager. Hive automatically uses map joins for join queries that involve a set of tables where:

  • There is one large table and there is no limit on the size of that large table.
  • All other tables involved in the join must have an aggregate size under the value set for Hive Auto Convert Join Noconditional Size for HiveServer2, which is set to 20MB by default in Cloudera Manager.

For more information about map joins, see the Apache wiki.

To enable or disable map joins on a per-query basis, use the Hive SET command:

SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask.size=<number_in_megabytes>;
      

In CDH 5.13, when you are using HoS and the tables involved in a join query trigger a map join, two Spark jobs are launched and perform the following actions:

  • the first job scans the smaller table, creates a hash table, and writes it to HDFS,
  • the second job runs the join and the rest of the query, scanning the larger table.

If DPP is enabled and is also triggered, the two Spark jobs perform the following actions:

  • the first Spark job creates the hash table from the small table and identifies the partitions that should be scanned from the large table,
  • the second Spark job then scans the relevant partitions from the large table that are to be used in the join.

After these actions are performed, the query proceeds normally with the map join.

Enabling Dynamic Partition Pruning for Map Joins in Hive on Spark

Dynamic partition pruning (DPP) is disabled by default in CDH 5.13. Use Cloudera Manager to set the following properties.

Property Name Description Default Setting
hive.spark.dynamic.partition.pruning.map.join.only

Enables dynamic partition pruning for queries where the join on the partitioned column is a map join. This property only applies to the Spark execution engine.

Set this property to true to use dynamic partition pruning for queries where the join on the partitioned column is a map join.

false (turned off)

hive.spark.dynamic.partition.pruning

Enables dynamic partition pruning for all joins, including shuffle joins and map joins.

false (turned off)

Enabling DPP on a Per-Query Basis with the Hive SET Command

To enable DPP at the session level, use the Hive SET command:

SET hive.spark.dynamic.partition.pruning.map.join.only=true;
          

Enabling DPP as a Service-Wide Default with Cloudera Manager

Use Cloudera Manager to enable DPP as a service-wide default:

  1. In the Cloudera Manager Admin Console, go to the Hive service.
  2. In the Hive service page, click the Configuration tab.
  3. On the Configuration page, click the HiveServer2 scope and click the Performance category.
  4. Search for Hive on Spark Dynamic Partition Pruning for MapJoins, and select the check box.
  5. Click Save Changes.

Verifying Your Query Uses Dynamic Partition Pruning in Hive on Spark

Use EXPLAIN to generate a query plan, which you can use to verify that DPP is being triggered for your query.

Example of Verifying that Dynamic Partition Pruning Is Triggered For Your Query

In this example, TPC-DS benchmark data is used with the query generated from query3.tpl in their downloadable package. It demonstrates how you can use the EXPLAIN command to verify that DPP is being triggered. For more information about the TPC-DS benchmark data and queries, see www.tpc.org/tpcds/.

First, set the following properties which instruct Hive to use Spark as its execution engine and turns on DPP for map joins:

SET hive.execution.engine=spark;
SET hive.spark.dynamic.partition.pruning.map.join.only=true;
        

Then run the following commands, which tell Hive to use the testing_example_db database and to show (EXPLAIN) the query plan for the query that follows:

USE testing_example_db;

EXPLAIN
SELECT dt.d_year
       ,item.i_brand_id brand_id
       ,item.i_brand brand
       ,sum(ss_ext_sales_price) sum_agg
FROM date_dim dt
       ,store_sales
       ,item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
  AND store_sales.ss_item_sk = item.i_item_sk
  AND item.i_manufact_id = 436
  AND dt.d_moy=12
GROUP BY dt.d_year
      ,item.i_brand
      ,item.i_brand_id
ORDER BY dt.d_year
         ,sum_agg desc
         ,brand_id
LIMIT 100;
        

The EXPLAIN command returns the query plan for the TPC-DS query. An excerpt from that query plan is included below. Look for the Spark HashTable Sink Operator and the Spark Partition Pruning Sink Operator, which are in bold font in the following output. Presence of these sink operators in the query plan indicate that DPP is being triggered for the query.

+----------------------------------------------------+--+
|                      Explain                       |
+----------------------------------------------------+--+
| STAGE DEPENDENCIES:                                |
|   Stage-2 is a root stage                          |
|   Stage-1 depends on stages: Stage-2               |
|   Stage-0 depends on stages: Stage-1               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-2                                   |
|     Spark                                          |
|       DagName: hive_20170908151313_f478b7d3-89b8-4c6d-b98c-4ef3b8e25bf7:964 |
|       Vertices:                                    |
|         Map 1                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: dt                        |
|                   filterExpr: (d_date_sk is not null and (d_moy = 12)) (type: boolean) |
|                   Statistics: Num rows: 73049 Data size: 2045372 Basic stats: COMPLETE Column stats: NONE |
|                   Filter Operator                  |
|                     predicate: (d_date_sk is not null and (d_moy = 12)) (type: boolean) |
|                     Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE |
|                     Spark HashTable Sink Operator  |
|                       keys:                        |
|                         0 d_date_sk (type: bigint) |
|                         1 ss_sold_date_sk (type: bigint) |
|                     Select Operator                |
|                       expressions: d_date_sk (type: bigint) |
|                       outputColumnNames: _col0     |
|                       Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE |
|                       Group By Operator            |
|                         keys: _col0 (type: bigint) |
|                         mode: hash                 |
|                         outputColumnNames: _col0   |
|                         Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE |
|                         Spark Partition Pruning Sink Operator |
|                           partition key expr: ss_sold_date_sk |
|                           tmp Path: hdfs://<server_name>.<domain>.com:8020/tmp/hive/hive/a8939414-8311-4b06-bbd6-5afc9c3b2d3d/hive_2017-09-08_15-13-54_861_527211251736847122-4/-mr-10003/2/1 |
|                           Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE |
|                           target column name: ss_sold_date_sk |
|                           target work: Map 2       |
|             Local Work:                            |
|               Map Reduce Local Work                |
|         Map 5                                      |
|             Map Operator Tree:                     |
|                 TableScan                          |
|                   alias: item                      |
|                   filterExpr: (i_item_sk is not null and (i_manufact_id = 436)) (type: boolean) |
|                   Statistics: Num rows: 102000 Data size: 2244000 Basic stats: COMPLETE Column stats: NONE |
|                   Filter Operator                  |
|                     predicate: (i_item_sk is not null and (i_manufact_id = 436)) (type: boolean) |
|                     Statistics: Num rows: 25500 Data size: 561000 Basic stats: COMPLETE Column stats: NONE |
|                     Spark HashTable Sink Operator  |
|                       keys:                        |
|                         0 _col32 (type: bigint)    |
|                         1 i_item_sk (type: bigint) |
|             Local Work:                            |
|               Map Reduce Local Work                |
|
...
          

Queries That Trigger and Benefit from Dynamic Partition Pruning in Hive on Spark

When tables are created in Hive, it is common practice to partition them. Partitioning breaks large tables into horizontal slices of data. Each partition typically corresponds to a separate folder on HDFS. Tables can be partitioned when the data has a "natural" partitioning column, such as a date column. Hive queries that read from partitioned tables typically filter on the partition column in order to avoid reading all partitions from the table. For example, if you have a partitioned table called date_partitioned_table that is partitioned on the datetime column, the following query only reads partitions that are created after January 1, 2017:

SELECT *
FROM date_partitioned_table
WHERE datetime > '2017-01-01';
        

If the date_partitioned_table table has partitions for dates that extend to 2010, this WHERE clause filter can significantly decrease the amount of data that needs to be read by the query. This query is easy for Hive to optimize. When it is compiled, only partitions where datetime is greater than 2017-01-01 need to be read. This form of partition pruning is known as static partition pruning.

However, when queries become more complex, the filter on the partitioned column cannot be evaluated at runtime. For example, this query:

SELECT *
FROM date_partitioned_table
WHERE datetime IN (SELECT * FROM non_partitioned_table);
        

With this type of query, it is difficult for the Hive compiler to optimize its execution because the rows that are returned by the sub query SELECT * FROM non_partitioned_table are unknown. In this situation, dynamic partition pruning (DPP) optimizes the query. Hive can dynamically prune partitions from the scan of non_partitioned_table by eliminating partitions while the query is running. Queries that use this pattern can see performance improvements when DPP is enabled. Note that this query contains an IN clause which triggers a join between the date_partitioned_table and the non_partitioned_table. DPP is only triggered when there is a join on a partitioned column.

DPP might provide performance benefits for Hive data warehouses that use the star or snowflake schema. Performance improvements are possible for Hive queries that join a partitioned fact table on the partitioned column of a dimension table if DPP is enabled. The TPC-DS benchmark is a good example where many of its queries benefit from DPP. The query example from the TPC-DS benchmark listed in the above section with EXPLAIN, triggers DPP:

SELECT dt.d_year
       ,item.i_brand_id brand_id
       ,item.i_brand brand
       ,sum(ss_ext_sales_price) sum_agg
FROM date_dim dt
       ,store_sales
       ,item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
  AND store_sales.ss_item_sk = item.i_item_sk
  AND item.i_manufact_id = 436
  AND dt.d_moy=12
GROUP BY dt.d_year
      ,item.i_brand
      ,item.i_brand_id
ORDER BY dt.d_year
         ,sum_agg desc
         ,brand_id
LIMIT 100;
        

This query performs a join between the partitioned store_sales table and the non-partitioned date_dim table. The join is performed against the partition column for store_sales, which is what triggers DPP. The join must be against a partitioned column for DPP to be triggered.

In CDH 5.13, DPP is only supported for map joins. It is not supported for common joins, those that require a shuffle phase. A single query may have multiple joins, some of which are map joins and some of which are common joins. Only the join on the partitioned column must be a map join for DPP to be triggered.

Debugging Dynamic Partition Pruning in Hive on Spark

Debug DPP for Hive on Spark by viewing the query plan produced with the EXPLAIN command or by viewing two types of log files. Both options are discussed in the following sections.

Debugging with Query Plans Produced with EXPLAIN

A simple way to check whether DPP is triggered for a query is to use the EXPLAIN command as shown above in Verifying Your DPP Configuration in Hive on Spark. If the query plan contains a Spark Partition Pruning Sink Operator, DPP will be triggered for the query. If it does not contain this operator, DPP will not be triggered for the query.

Debugging with Logs

Use the HiveServer2 logs to debug the compile time phase of DPP and use the Hive on Spark Remote Driver logs to debug the runtime phase of DPP:

  • HiveServer2 Logs

    The HiveServer2 logs print debugging information from the Java class DynamicPartitionPruningOptimization. This class looks at the query and checks if it can benefit from DPP. If the query can benefit from DPP, the class modifies the query plan to include DPP-specific operators, such as the Spark Partition Pruning Sink Operator. When the class runs, it prints out information related to whether or not it is enabling DPP for a particular clause in the query.

    For example, if the following message appears in the HiveServer2 log, it means that DPP will be triggered and that partitions will be dynamically pruned from the partitioned_table table, which is in bold text in the following example:

    INFO  org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization:
          [HiveServer2-Handler-Pool: Thread-xx]: Dynamic partitioning: default@partitioned_table.partition_column
                  

    To access these log files in Cloudera Manager, select Hive > HiveServer2 > Log Files > Role Log File.

  • Hive on Spark Remote Driver Logs

    The Hive on Spark (HoS) Remote Driver logs print debugging information from the Java class SparkDynamicPartitionPruner. This class does the actual pruning of the partitioned table. Because pruning happens at runtime, the logs for this class are located in the HoS Remote Driver logs instead of the HiveServer2 logs. These logs print which partitions are pruned from the partitioned table, which can be very useful for troubleshooting.

    For example, if the following message appears in the HoS Remote Driver log, it means that the partition partition_column=1 is being pruned from the table partitioned_table, both of which are in bold text in the following example:

    INFO spark.SparkDynamicPartitionPruner:Pruning path:
    hdfs://<namenode_uri>/user/hive/warehouse/partitioned_table/partition_column=1
                  

    To access these log files in Cloudera Manager, select SPARK_ON_YARN > History Server Web UI > <select_an_application> > Executors > executor id = driver > stderr.

Troubleshooting Hive on Spark

Delayed result from the first query after starting a new Hive on Spark session

Symptom

The first query after starting a new Hive on Spark session might be delayed due to the start-up time for the Spark on YARN cluster.

Cause

The query waits for YARN containers to initialize.

Solution

No action required. Subsequent queries will be faster.

Exception in HiveServer2 log and HiveServer2 is down

Symptom

In the HiveServer2 log you see the following exception: Error: org.apache.thrift.transport.TTransportException (state=08S01,code=0)

Cause

HiveServer2 memory is set too small. For more information, see stdout for HiveServer2.

Solution

  1. Go to the Hive service.
  2. Click the Configuration tab.
  3. Search for Java Heap Size of HiveServer2 in Bytes, and increase the value. Cloudera recommends a minimum value of 2 GB.
  4. Click Save Changes to commit the changes.
  5. Restart HiveServer2.

Out-of-memory error

Symptom

In the log you see an out-of-memory error similar to the following:
15/03/19 03:43:17 WARN channel.DefaultChannelPipeline:
An exception was thrown by a user handler while handling an exception event ([id: 0x9e79a9b1, /10.20.118.103:45603 => /10.20.120.116:39110]
      EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
      java.lang.OutOfMemoryError: Java heap space

Cause

The Spark driver does not have enough off-heap memory.

Solution

Increase the driver memory spark.driver.memory and ensure that spark.yarn.driver.memoryOverhead is at least 20% that of the driver memory.

Spark applications stay alive forever

Symptom

Cluster resources are consumed by Spark applications.

Cause

This can occur if you run multiple Hive on Spark sessions concurrently.

Solution

Manually terminate the Hive on Spark applications:
  1. Go to the YARN service.
  2. Click the Applications tab.
  3. In the row containing the Hive on Spark application, select > Kill.