Running Apache Hive on Spark in CDH
This section explains how to run Hive using the Spark execution engine. It assumes that the cluster is managed by Cloudera Manager.
Configuring Hive on Spark
Minimum Required Role: Configurator (also provided by Cluster Administrator, Full Administrator)
To configure Hive to run on Spark do both of the following steps:
- Configure the Hive client to use the Spark execution engine as described in Hive Execution Engines.
- Identify the Spark service that Hive uses. Cloudera Manager automatically sets this to the configured MapReduce or YARN service and the configured Spark service. See Configuring the Hive Dependency on a Spark Service.
Hive on Spark Memory and Hardware Requirements
Individual executor heaps should be no larger than 16 GB so machines with more RAM can use multiple executors.
|For more information on how to reserve YARN cores and memory that will be used by Spark executors, refer to Tuning Apache Hive on Spark in CDH.|
Configuring the Hive Dependency on a Spark Service
By default, if a Spark service is available, the Hive dependency on the Spark service is configured. To change this configuration, do the following:
- In the Cloudera Manager Admin Console, go to the Hive service.
- Click the Configuration tab.
- Search for the Spark On YARN Service. To configure the Spark service, select the Spark service name. To remove the dependency, select none.
- Click Save Changes.
- Go to the Spark service.
- Add a Spark gateway role to the host running HiveServer2.
- Return to the Home page by clicking the Cloudera Manager logo.
- Click the icon that is next to any stale services to invoke the cluster restart wizard.
- Click Restart Stale Services.
- Click Restart Now.
- Click Finish.
- In the Hive client, configure the Spark execution engine.
Dynamic Partition Pruning for Hive Map Joins
Starting in CDH 5.13, you can enable dynamic partition pruning for map joins when you are running Hive on Spark (HoS). Dynamic partition pruning (DPP) is a database optimization that can significantly decrease the amount of data that a query scans, thereby executing your workloads faster. DPP achieves this by dynamically determining and eliminating the number of partitions that a query must read from a partitioned table.
Map joins also optimize how Hive executes queries. They cause a small table to be scanned and loaded in memory as a hash table so that a fast join can be performed entirely within a mapper without having to use another reduce step. If you have queries that join small tables, map joins can make them execute much faster. Map joins are enabled by default in CDH with the Enable MapJoin Optimization setting for HiveServer2 in Cloudera Manager. Hive automatically uses map joins for join queries that involve a set of tables where:
- There is one large table and there is no limit on the size of that large table.
- All other tables involved in the join must have an aggregate size under the value set for Hive Auto Convert Join Noconditional Size for HiveServer2, which is set to 20MB by default in Cloudera Manager.
For more information about map joins, see the Apache wiki.
To enable or disable map joins on a per-query basis, use the Hive SET command:
SET hive.auto.convert.join=true; SET hive.auto.convert.join.noconditionaltask.size=<number_in_megabytes>;
In CDH 5.13, when you are using HoS and the tables involved in a join query trigger a map join, two Spark jobs are launched and perform the following actions:
- the first job scans the smaller table, creates a hash table, and writes it to HDFS,
- the second job runs the join and the rest of the query, scanning the larger table.
If DPP is enabled and is also triggered, the two Spark jobs perform the following actions:
- the first Spark job creates the hash table from the small table and identifies the partitions that should be scanned from the large table,
- the second Spark job then scans the relevant partitions from the large table that are to be used in the join.
After these actions are performed, the query proceeds normally with the map join.
Enabling Dynamic Partition Pruning for Map Joins in Hive on Spark
Dynamic partition pruning (DPP) is disabled by default in CDH 5.13. Use Cloudera Manager to set the following properties.
|Property Name||Description||Default Setting|
Enables dynamic partition pruning for queries where the join on the partitioned column is a map join. This property only applies to the Spark execution engine.
Set this property to true to use dynamic partition pruning for queries where the join on the partitioned column is a map join.
|false (turned off)|
Enables dynamic partition pruning for all joins, including shuffle joins and map joins.
false (turned off)
Enabling DPP on a Per-Query Basis with the Hive SET Command
To enable DPP at the session level, use the Hive SET command:
Enabling DPP as a Service-Wide Default with Cloudera Manager
Use Cloudera Manager to enable DPP as a service-wide default:
- In the Cloudera Manager Admin Console, go to the Hive service.
- In the Hive service page, click the Configuration tab.
- On the Configuration page, click the HiveServer2 scope and click the Performance category.
- Search for Hive on Spark Dynamic Partition Pruning for MapJoins, and select the check box.
- Click Save Changes.
Verifying Your Query Uses Dynamic Partition Pruning in Hive on Spark
Use EXPLAIN to generate a query plan, which you can use to verify that DPP is being triggered for your query.
Example of Verifying that Dynamic Partition Pruning Is Triggered For Your Query
In this example, TPC-DS benchmark data is used with the query generated from query3.tpl in their downloadable package. It demonstrates how you can use the EXPLAIN command to verify that DPP is being triggered. For more information about the TPC-DS benchmark data and queries, see www.tpc.org/tpcds/.
First, set the following properties which instruct Hive to use Spark as its execution engine and turns on DPP for map joins:
SET hive.execution.engine=spark; SET hive.spark.dynamic.partition.pruning.map.join.only=true;
Then run the following commands, which tell Hive to use the testing_example_db database and to show (EXPLAIN) the query plan for the query that follows:
USE testing_example_db; EXPLAIN SELECT dt.d_year ,item.i_brand_id brand_id ,item.i_brand brand ,sum(ss_ext_sales_price) sum_agg FROM date_dim dt ,store_sales ,item WHERE dt.d_date_sk = store_sales.ss_sold_date_sk AND store_sales.ss_item_sk = item.i_item_sk AND item.i_manufact_id = 436 AND dt.d_moy=12 GROUP BY dt.d_year ,item.i_brand ,item.i_brand_id ORDER BY dt.d_year ,sum_agg desc ,brand_id LIMIT 100;
The EXPLAIN command returns the query plan for the TPC-DS query. An excerpt from that query plan is included below. Look for the Spark HashTable Sink Operator and the Spark Partition Pruning Sink Operator, which are in bold font in the following output. Presence of these sink operators in the query plan indicate that DPP is being triggered for the query.
+----------------------------------------------------+--+ | Explain | +----------------------------------------------------+--+ | STAGE DEPENDENCIES: | | Stage-2 is a root stage | | Stage-1 depends on stages: Stage-2 | | Stage-0 depends on stages: Stage-1 | | | | STAGE PLANS: | | Stage: Stage-2 | | Spark | | DagName: hive_20170908151313_f478b7d3-89b8-4c6d-b98c-4ef3b8e25bf7:964 | | Vertices: | | Map 1 | | Map Operator Tree: | | TableScan | | alias: dt | | filterExpr: (d_date_sk is not null and (d_moy = 12)) (type: boolean) | | Statistics: Num rows: 73049 Data size: 2045372 Basic stats: COMPLETE Column stats: NONE | | Filter Operator | | predicate: (d_date_sk is not null and (d_moy = 12)) (type: boolean) | | Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE | | Spark HashTable Sink Operator | | keys: | | 0 d_date_sk (type: bigint) | | 1 ss_sold_date_sk (type: bigint) | | Select Operator | | expressions: d_date_sk (type: bigint) | | outputColumnNames: _col0 | | Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE | | Group By Operator | | keys: _col0 (type: bigint) | | mode: hash | | outputColumnNames: _col0 | | Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE | | Spark Partition Pruning Sink Operator | | partition key expr: ss_sold_date_sk | | tmp Path: hdfs://<server_name>.<domain>.com:8020/tmp/hive/hive/a8939414-8311-4b06-bbd6-5afc9c3b2d3d/hive_2017-09-08_15-13-54_861_527211251736847122-4/-mr-10003/2/1 | | Statistics: Num rows: 18262 Data size: 511336 Basic stats: COMPLETE Column stats: NONE | | target column name: ss_sold_date_sk | | target work: Map 2 | | Local Work: | | Map Reduce Local Work | | Map 5 | | Map Operator Tree: | | TableScan | | alias: item | | filterExpr: (i_item_sk is not null and (i_manufact_id = 436)) (type: boolean) | | Statistics: Num rows: 102000 Data size: 2244000 Basic stats: COMPLETE Column stats: NONE | | Filter Operator | | predicate: (i_item_sk is not null and (i_manufact_id = 436)) (type: boolean) | | Statistics: Num rows: 25500 Data size: 561000 Basic stats: COMPLETE Column stats: NONE | | Spark HashTable Sink Operator | | keys: | | 0 _col32 (type: bigint) | | 1 i_item_sk (type: bigint) | | Local Work: | | Map Reduce Local Work | | ...
Queries That Trigger and Benefit from Dynamic Partition Pruning in Hive on Spark
When tables are created in Hive, it is common practice to partition them. Partitioning breaks large tables into horizontal slices of data. Each partition typically corresponds to a separate folder on HDFS. Tables can be partitioned when the data has a "natural" partitioning column, such as a date column. Hive queries that read from partitioned tables typically filter on the partition column in order to avoid reading all partitions from the table. For example, if you have a partitioned table called date_partitioned_table that is partitioned on the datetime column, the following query only reads partitions that are created after January 1, 2017:
SELECT * FROM date_partitioned_table WHERE datetime > '2017-01-01';
If the date_partitioned_table table has partitions for dates that extend to 2010, this WHERE clause filter can significantly decrease the amount of data that needs to be read by the query. This query is easy for Hive to optimize. When it is compiled, only partitions where datetime is greater than 2017-01-01 need to be read. This form of partition pruning is known as static partition pruning.
However, when queries become more complex, the filter on the partitioned column cannot be evaluated at runtime. For example, this query:
SELECT * FROM date_partitioned_table WHERE datetime IN (SELECT * FROM non_partitioned_table);
With this type of query, it is difficult for the Hive compiler to optimize its execution because the rows that are returned by the sub query SELECT * FROM non_partitioned_table are unknown. In this situation, dynamic partition pruning (DPP) optimizes the query. Hive can dynamically prune partitions from the scan of non_partitioned_table by eliminating partitions while the query is running. Queries that use this pattern can see performance improvements when DPP is enabled. Note that this query contains an IN clause which triggers a join between the date_partitioned_table and the non_partitioned_table. DPP is only triggered when there is a join on a partitioned column.
DPP might provide performance benefits for Hive data warehouses that use the star or snowflake schema. Performance improvements are possible for Hive queries that join a partitioned fact table on the partitioned column of a dimension table if DPP is enabled. The TPC-DS benchmark is a good example where many of its queries benefit from DPP. The query example from the TPC-DS benchmark listed in the above section with EXPLAIN, triggers DPP:
SELECT dt.d_year ,item.i_brand_id brand_id ,item.i_brand brand ,sum(ss_ext_sales_price) sum_agg FROM date_dim dt ,store_sales ,item WHERE dt.d_date_sk = store_sales.ss_sold_date_sk AND store_sales.ss_item_sk = item.i_item_sk AND item.i_manufact_id = 436 AND dt.d_moy=12 GROUP BY dt.d_year ,item.i_brand ,item.i_brand_id ORDER BY dt.d_year ,sum_agg desc ,brand_id LIMIT 100;
This query performs a join between the partitioned store_sales table and the non-partitioned date_dim table. The join is performed against the partition column for store_sales, which is what triggers DPP. The join must be against a partitioned column for DPP to be triggered.
In CDH 5.13, DPP is only supported for map joins. It is not supported for common joins, those that require a shuffle phase. A single query may have multiple joins, some of which are map joins and some of which are common joins. Only the join on the partitioned column must be a map join for DPP to be triggered.
Debugging Dynamic Partition Pruning in Hive on Spark
Debug DPP for Hive on Spark by viewing the query plan produced with the EXPLAIN command or by viewing two types of log files. Both options are discussed in the following sections.
Debugging with Query Plans Produced with EXPLAIN
A simple way to check whether DPP is triggered for a query is to use the EXPLAIN command as shown above in Verifying Your DPP Configuration in Hive on Spark. If the query plan contains a Spark Partition Pruning Sink Operator, DPP will be triggered for the query. If it does not contain this operator, DPP will not be triggered for the query.
Debugging with Logs
Use the HiveServer2 logs to debug the compile time phase of DPP and use the Hive on Spark Remote Driver logs to debug the runtime phase of DPP:
The HiveServer2 logs print debugging information from the Java class DynamicPartitionPruningOptimization. This class looks at the query and checks if it can benefit from DPP. If the query can benefit from DPP, the class modifies the query plan to include DPP-specific operators, such as the Spark Partition Pruning Sink Operator. When the class runs, it prints out information related to whether or not it is enabling DPP for a particular clause in the query.
For example, if the following message appears in the HiveServer2 log, it means that DPP will be triggered and that partitions will be dynamically pruned from the partitioned_table table, which is in bold text in the following example:
INFO org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization: [HiveServer2-Handler-Pool: Thread-xx]: Dynamic partitioning: default@partitioned_table.partition_column
To access these log files in Cloudera Manager, select.
Hive on Spark Remote Driver Logs
The Hive on Spark (HoS) Remote Driver logs print debugging information from the Java class SparkDynamicPartitionPruner. This class does the actual pruning of the partitioned table. Because pruning happens at runtime, the logs for this class are located in the HoS Remote Driver logs instead of the HiveServer2 logs. These logs print which partitions are pruned from the partitioned table, which can be very useful for troubleshooting.
For example, if the following message appears in the HoS Remote Driver log, it means that the partition partition_column=1 is being pruned from the table partitioned_table, both of which are in bold text in the following example:
INFO spark.SparkDynamicPartitionPruner:Pruning path: hdfs://<namenode_uri>/user/hive/warehouse/partitioned_table/partition_column=1
To access these log files in Cloudera Manager, select.
Using Hive UDFs with Hive on Spark
When the execution engine is set to Spark, use Hive UDFs the same way that you use them when the execution engine is set to MapReduce. To apply a custom UDF on the column of a Hive table, use the following syntax:
SELECT <custom_UDF_name>(<column_name>) FROM <table_name>;
For example, to apply the custom UDF addfunc10 to the salary column of the sample_07 table in the default database that ships with CDH, use the following syntax:
SELECT addfunc10(salary) FROM sample_07 LIMIT 10;
The above HiveQL statement returns only 10 rows from the sample_07 table.
To use Hive built-in UDFs, see the LanguageManual UDF on the Apache wiki. To create custom UDFs in Hive, see Managing Apache Hive User-Defined Functions (UDFs) in CDH.
Troubleshooting Hive on Spark
Delayed result from the first query after starting a new Hive on Spark session
Exception in HiveServer2 log and HiveServer2 is down
SymptomIn the HiveServer2 log you see the following exception: Error: org.apache.thrift.transport.TTransportException (state=08S01,code=0)
SymptomIn the log you see an out-of-memory error similar to the following:
15/03/19 03:43:17 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x9e79a9b1, /10.20.118.103:45603 => /10.20.120.116:39110] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space